diff --git a/.gitignore b/.gitignore index 637d5966..fb8eb90b 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ .gitignore.swp .project .settings +.DS_Store target bin /flink.http.connector.iml diff --git a/CHANGELOG.md b/CHANGELOG.md index e45bb152..b7db3185 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## [Unreleased] +- Added ability to continue on error, introducing new metadata columns and new configuration option + `gid.connector.http.source.lookup.continue-on-error` + ## [0.21.0] - 2025-09-16 - optimized logging in HttpHeaderUtils. diff --git a/README.md b/README.md index 211eff4a..019c07a8 100644 --- a/README.md +++ b/README.md @@ -176,16 +176,55 @@ The second one is set per individual HTTP requests by HTTP client. Its default v Flink's current implementation of `AsyncTableFunction` does not allow specifying custom logic for handling Flink AsyncIO timeouts as it is for Java API. Because of that, if AsyncIO timer passes, Flink will throw TimeoutException which will cause job restart. -#### Retries (Lookup source) +#### Available Metadata (Lookup source) + +The metadata column `http-status-code`, if specified in the table definition, will get the HTTP status code. +The metadata column `http-headers-map `, if specified in the table definition, will get a map of the HTTP headers. + +HTTP requests can fail either immediately or after temporary error retries. The usual behaviour after such failures is to end the job. If you would like to continue +processing after these failures then specify `gid.connector.http.source.lookup.continue-on-error` as true. THe lookup join will complete without content in the expected enrichment columns from the http call, +this means that these columns will be null for nullable columns and hold a default value for the type for non-nullable columns. + +When using `gid.connector.http.source.lookup.continue-on-error` as true, consider adding extra metadata columns that will surface information about failures into your stream. + +Metadata columns can be specified and hold http information. They are optional read-only columns that must be declared VIRTUAL to exclude them during an INSERT INTO operation. + +| Key | Data Type | Description | +|-----------------------|----------------------------------|----------------------------------------| +| error-string | STRING NULL | A message associated with the error | +| http-status-code | INT NULL | The HTTP status code | +| http-headers-map | MAP > NULL | The headers returned with the response | +| http-completion-state | STRING NULL | The completion state of the http call. | + +##### http-completion-state possible values + +| Value | Description | +|:------------------|------------------------| +| SUCCESS | Success | +| HTTP_ERROR_STATUS | HTTP error status code | +| EXCEPTION | An Exception occurred | + +If the `error-string` metadata column is defined on the table and the call succeeds then it will have a null value. + +When a http lookup call fails and populates the metadata columns with the error information, the expected enrichment columns from the http call +are not populated, this means that they will be null for nullable columns and hold a default value for the type for non-nullable columns. + +If you are using the Table API `TableResult` and have an `await` with a timeout, this Timeout exception will cause the job to terminate, +even if there are metadata columns defined. + +#### Retries and handling errors (Lookup source) Lookup source handles auto-retries for two scenarios: 1. IOException occurs (e.g. temporary network outage) 2. The response contains a HTTP error code that indicates a retriable error. These codes are defined in the table configuration (see `gid.connector.http.source.lookup.retry-codes`). -Retries are executed silently, without restarting the job. After reaching max retries attempts (per request) operation will fail and restart job. +Retries are executed silently, without restarting the job. Notice that HTTP codes are categorized into into 3 groups: - successful responses - response is returned immediately for further processing - temporary errors - request will be retried up to the retry limit -- error responses - unexpected responses are not retried and will fail the job. Any HTTP error code which is not configured as successful or temporary error is treated as an unretriable error. +- error responses - unexpected responses are not retried. Any HTTP error code which is not configured as successful or temporary error is treated as an unretriable error. + +For temporary errors that have reached max retries attempts (per request) and error responses, the operation will +succeed if `gid.connector.http.source.lookup.continue-on-error` is true, otherwise the job will fail. ##### Retry strategy User can choose retry strategy type for source table: @@ -555,6 +594,7 @@ be requested if the current time is later than the cached token expiry time minu | gid.connector.http.source.lookup.retry-strategy.exponential-delay.initial-backoff | optional | Exponential-delay initial delay. Default 1 second. | | gid.connector.http.source.lookup.retry-strategy.exponential-delay.max-backoff | optional | Exponential-delay maximum delay. Default 1 minute. Use with `lookup.max-retries` parameter. | | gid.connector.http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier | optional | Exponential-delay multiplier. Default value 1.5 | +| gid.connector.http.source.lookup.continue-on-error | optional | When true, the flow will continue on errors, returning row content. When false (the default) the job ends on errors. | | gid.connector.http.source.lookup.proxy.host | optional | Specify the hostname of the proxy. | | gid.connector.http.source.lookup.proxy.port | optional | Specify the port of the proxy. | | gid.connector.http.source.lookup.proxy.username | optional | Specify the username used for proxy authentication. | diff --git a/src/main/java/com/getindata/connectors/http/internal/PollingClient.java b/src/main/java/com/getindata/connectors/http/internal/PollingClient.java index 925e10b6..ccdb5645 100644 --- a/src/main/java/com/getindata/connectors/http/internal/PollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/PollingClient.java @@ -1,21 +1,21 @@ package com.getindata.connectors.http.internal; -import java.util.Collection; - import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.FunctionContext; +import com.getindata.connectors.http.internal.table.lookup.HttpRowDataWrapper; + /** * A client that is used to get enrichment data from external component. */ -public interface PollingClient { +public interface PollingClient { /** * Gets enrichment data from external component using provided lookup arguments. * @param lookupRow A {@link RowData} containing request parameters. - * @return an optional result of data lookup. + * @return an optional result of data lookup with http information. */ - Collection pull(RowData lookupRow); + HttpRowDataWrapper pull(RowData lookupRow); /** * Initialize the client. diff --git a/src/main/java/com/getindata/connectors/http/internal/PollingClientFactory.java b/src/main/java/com/getindata/connectors/http/internal/PollingClientFactory.java index 7981558d..8c51d323 100644 --- a/src/main/java/com/getindata/connectors/http/internal/PollingClientFactory.java +++ b/src/main/java/com/getindata/connectors/http/internal/PollingClientFactory.java @@ -3,14 +3,15 @@ import java.io.Serializable; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.table.data.RowData; import org.apache.flink.util.ConfigurationException; import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig; -public interface PollingClientFactory extends Serializable { +public interface PollingClientFactory extends Serializable { - PollingClient createPollClient( + PollingClient createPollClient( HttpLookupConfig options, - DeserializationSchema schemaDecoder + DeserializationSchema schemaDecoder ) throws ConfigurationException; } diff --git a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java index fa47529f..c6e436ad 100644 --- a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java +++ b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java @@ -88,6 +88,9 @@ public final class HttpConnectorConfigConstants { public static final String SOURCE_CONNECTION_TIMEOUT = SOURCE_LOOKUP_PREFIX + "connection.timeout"; + public static final String CONTINUE_ON_ERROR = + SOURCE_LOOKUP_PREFIX + "continue-on-error"; + public static final String SOURCE_PROXY_HOST = SOURCE_LOOKUP_PREFIX + "proxy.host"; diff --git a/src/main/java/com/getindata/connectors/http/internal/retry/RetryStrategyType.java b/src/main/java/com/getindata/connectors/http/internal/retry/RetryStrategyType.java index b9c8876d..ad378775 100644 --- a/src/main/java/com/getindata/connectors/http/internal/retry/RetryStrategyType.java +++ b/src/main/java/com/getindata/connectors/http/internal/retry/RetryStrategyType.java @@ -8,8 +8,7 @@ @RequiredArgsConstructor(access = AccessLevel.PRIVATE) public enum RetryStrategyType { FIXED_DELAY("fixed-delay"), - EXPONENTIAL_DELAY("exponential-delay"), - ; + EXPONENTIAL_DELAY("exponential-delay"); private final String code; diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpCompletionState.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpCompletionState.java new file mode 100644 index 00000000..20c19068 --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpCompletionState.java @@ -0,0 +1,7 @@ +package com.getindata.connectors.http.internal.table.lookup; + +public enum HttpCompletionState { + HTTP_ERROR_STATUS, + EXCEPTION, + SUCCESS +} diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java index b21eba93..3cf44e8e 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java @@ -82,6 +82,12 @@ public class HttpLookupConnectorOptions { .noDefaultValue() .withDescription("Http client connection timeout."); + public static final ConfigOption SOURCE_LOOKUP_CONTINUE_ON_ERROR = + ConfigOptions.key(CONTINUE_ON_ERROR) + .booleanType() + .defaultValue(false) + .withDescription("Continue job on error."); + public static final ConfigOption SOURCE_LOOKUP_PROXY_HOST = ConfigOptions.key(SOURCE_PROXY_HOST) .stringType() diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java index 129837c3..e405ae7d 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java @@ -1,7 +1,8 @@ package com.getindata.connectors.http.internal.table.lookup; -import java.util.ArrayList; -import java.util.List; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; @@ -15,12 +16,13 @@ import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; import org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider ; import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider; import org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider; import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider; import org.apache.flink.table.connector.source.lookup.cache.LookupCache; -import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.*; import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.functions.AsyncLookupFunction; @@ -42,7 +44,7 @@ @Slf4j public class HttpLookupTableSource - implements LookupTableSource, SupportsProjectionPushDown, SupportsLimitPushDown { + implements LookupTableSource, SupportsReadingMetadata, SupportsProjectionPushDown, SupportsLimitPushDown { private DataType physicalRowDataType; @@ -54,6 +56,16 @@ public class HttpLookupTableSource @Nullable private final LookupCache cache; + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + /** Data type that describes the final output of the source. */ + protected DataType producedDataType; + + /** Metadata that is appended at the end of a physical source row. */ + protected List metadataKeys; + public HttpLookupTableSource( DataType physicalRowDataType, HttpLookupConfig lookupConfig, @@ -100,7 +112,7 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex dynamicTableFactoryContext ); - PollingClientFactory pollingClientFactory = + PollingClientFactory pollingClientFactory = createPollingClientFactory(lookupQueryCreator, lookupConfig); return getLookupRuntimeProvider(lookupRow, responseSchemaDecoder, pollingClientFactory); @@ -109,15 +121,28 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow, DeserializationSchema responseSchemaDecoder, - PollingClientFactory + PollingClientFactory pollingClientFactory) { - + MetadataConverter[] metadataConverters={}; + if (this.metadataKeys != null) { + metadataConverters = this.metadataKeys.stream() + .map( + k -> + Stream.of(HttpLookupTableSource.ReadableMetadata.values()) + .filter(rm -> rm.key.equals(k)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .map(m -> m.converter) + .toArray(MetadataConverter[]::new); + } HttpTableLookupFunction dataLookupFunction = new HttpTableLookupFunction( pollingClientFactory, responseSchemaDecoder, lookupRow, - lookupConfig + lookupConfig, + metadataConverters, + this.producedDataType ); if (lookupConfig.isUseAsync()) { AsyncLookupFunction asyncLookupFunction = @@ -165,7 +190,7 @@ public boolean supportsNestedProjection() { return true; } - private PollingClientFactory createPollingClientFactory( + private PollingClientFactory createPollingClientFactory( LookupQueryCreator lookupQueryCreator, HttpLookupConfig lookupConfig) { @@ -256,4 +281,123 @@ private LookupSchemaEntry processRow(RowField rowField, int parentIndex RowData.createFieldGetter(type1, parentIndex)); } } + + @Override + public Map listReadableMetadata() { + final Map metadataMap = new LinkedHashMap<>(); + + decodingFormat.listReadableMetadata() + .forEach((key, value) -> metadataMap.put(key, value)); + + // according to convention, the order of the final row must be + // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA + // where the format metadata has highest precedence + // add connector metadata + Stream.of(ReadableMetadata.values()).forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType)); + return metadataMap; + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) { + // separate connector and format metadata + final List connectorMetadataKeys = new ArrayList<>(metadataKeys); + final Map formatMetadata = decodingFormat.listReadableMetadata(); + // store non connector keys and remove them from the connectorMetadataKeys. + List formatMetadataKeys = new ArrayList<>(); + Set metadataKeysSet = metadataKeys.stream().collect(Collectors.toSet()); + for (ReadableMetadata rm : ReadableMetadata.values()) { + String metadataKeyToCheck = rm.name(); + if (!metadataKeysSet.contains(metadataKeyToCheck)) { + formatMetadataKeys.add(metadataKeyToCheck); + connectorMetadataKeys.remove(metadataKeyToCheck); + } + } + // push down format metadata keys + if (formatMetadata.size() > 0) { + final List requestedFormatMetadataKeys = + formatMetadataKeys.stream().collect(Collectors.toList()); + decodingFormat.applyReadableMetadata(requestedFormatMetadataKeys); + } + this.metadataKeys = connectorMetadataKeys; + this.producedDataType = producedDataType; + } + + // -------------------------------------------------------------------------------------------- + // Metadata handling + // -------------------------------------------------------------------------------------------- + enum ReadableMetadata { + ERROR_STRING( + "error-string", + DataTypes.STRING(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + public Object read(HttpRowDataWrapper httpRowDataWrapper) { + if (httpRowDataWrapper == null) { + return null; + } + return StringData.fromString(httpRowDataWrapper.getErrorMessage()); + } + }), + HTTP_STATUS_CODE( + "http-status-code", + DataTypes.INT(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + public Object read(HttpRowDataWrapper httpRowDataWrapper) { + return (httpRowDataWrapper != null) ? httpRowDataWrapper.getHttpStatusCode() : null; + } + } + ), + HTTP_HEADERS( + "http-headers", + DataTypes.MAP(DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + public Object read(HttpRowDataWrapper httpRowDataWrapper) { + if (httpRowDataWrapper == null) { + return null; + } + Map> httpHeadersMap = httpRowDataWrapper.getHttpHeadersMap(); + if (httpHeadersMap == null) { + return null; + } + Map stringDataMap = new HashMap<>(); + for (String key : httpHeadersMap.keySet()) { + List strDataList = new ArrayList<>(); + httpHeadersMap.get(key).stream() + .forEach((c) -> strDataList.add(StringData.fromString(c))); + stringDataMap.put(StringData.fromString(key), new GenericArrayData(strDataList.toArray())); + } + return new GenericMapData(stringDataMap); + } + } + ), + HTTP_COMPLETION_STATE( + "http-completion-state", + DataTypes.STRING(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + public Object read(HttpRowDataWrapper httpRowDataWrapper) { + if (httpRowDataWrapper == null) { + return null; + } + return StringData.fromString(httpRowDataWrapper.getHttpCompletionState().name()); + } + }) + ; + final String key; + + final DataType dataType; + final MetadataConverter converter; + + ReadableMetadata(String key, DataType dataType, MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.converter = converter; + } + } } + diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpRowDataWrapper.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpRowDataWrapper.java new file mode 100644 index 00000000..aa0d1afb --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpRowDataWrapper.java @@ -0,0 +1,34 @@ +package com.getindata.connectors.http.internal.table.lookup; + + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import lombok.Builder; +import lombok.Data; +import org.apache.flink.table.data.RowData; + +/** + * This bean contains the RowData information (the response body as a flink RowData). + * It also contains information from the http response, namely the http headers map + * and the http status code where available. The extra information is for the metadata columns. + */ +@Builder +@Data +public class HttpRowDataWrapper { + private final Collection data; + private final String errorMessage; + private final Map> httpHeadersMap; + private final Integer httpStatusCode; + private final HttpCompletionState httpCompletionState; + + public boolean shouldIgnore() { + return (this.data != null + && this.data.isEmpty() + && this.errorMessage == null + && this.httpHeadersMap == null + && this.httpStatusCode == null + && httpCompletionState == HttpCompletionState.SUCCESS); + } +} diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java index 9c87ff47..9def454a 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java @@ -1,6 +1,6 @@ package com.getindata.connectors.http.internal.table.lookup; -import java.util.Collection; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import lombok.AccessLevel; @@ -8,9 +8,13 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.LookupFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; import com.getindata.connectors.http.internal.PollingClient; import com.getindata.connectors.http.internal.PollingClientFactory; @@ -19,7 +23,7 @@ @Slf4j public class HttpTableLookupFunction extends LookupFunction { - private final PollingClientFactory pollingClientFactory; + private final PollingClientFactory pollingClientFactory; private final DeserializationSchema responseSchemaDecoder; @@ -30,28 +34,33 @@ public class HttpTableLookupFunction extends LookupFunction { @VisibleForTesting @Getter(AccessLevel.PACKAGE) private final HttpLookupConfig options; - + private final DataType producedDataType; private transient AtomicInteger localHttpCallCounter; - - private transient PollingClient client; + private transient PollingClient client; + private final MetadataConverter[] metadataConverters; public HttpTableLookupFunction( - PollingClientFactory pollingClientFactory, + PollingClientFactory pollingClientFactory, DeserializationSchema responseSchemaDecoder, LookupRow lookupRow, - HttpLookupConfig options) { + HttpLookupConfig options, + MetadataConverter[] metadataConverters, + DataType producedDataType + ) { this.pollingClientFactory = pollingClientFactory; this.responseSchemaDecoder = responseSchemaDecoder; this.lookupRow = lookupRow; this.options = options; + this.metadataConverters = metadataConverters; + this.producedDataType = producedDataType; } @Override public void open(FunctionContext context) throws Exception { this.responseSchemaDecoder.open( - SerializationSchemaUtils - .createDeserializationInitContext(HttpTableLookupFunction.class)); + SerializationSchemaUtils + .createDeserializationInitContext(HttpTableLookupFunction.class)); this.localHttpCallCounter = new AtomicInteger(0); this.client = pollingClientFactory @@ -66,6 +75,53 @@ public void open(FunctionContext context) throws Exception { @Override public Collection lookup(RowData keyRow) { localHttpCallCounter.incrementAndGet(); - return client.pull(keyRow); + List outputList = new ArrayList<>(); + final int metadataArity = metadataConverters.length; + + HttpRowDataWrapper httpRowDataWrapper = client.pull(keyRow); + Collection httpCollector = httpRowDataWrapper.getData(); + + int physicalArity=-1; + + GenericRowData producedRow = null; + if (httpRowDataWrapper.shouldIgnore()) { + return Collections.emptyList(); + } + // grab the actual data if there is any from the response and populate the producedRow with it + if (!httpCollector.isEmpty()) { + // TODO original code increments again if empty - removing + // if (httpCollector.isEmpty()) { + // localHttpCallCounter.incrementAndGet(); + //} else { + + GenericRowData physicalRow = (GenericRowData) httpCollector.iterator().next(); + physicalArity = physicalRow.getArity(); + producedRow = new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity); + // We need to copy in the physical row into the producedRow + for (int pos = 0; pos < physicalArity; pos++) { + producedRow.setField(pos, physicalRow.getField(pos)); + } + } + // if we did not get the physical arity from the http response physical row then get it from the + // producedDataType. which is set when we have metadata + if (physicalArity == -1 && producedDataType != null ) { + List childrenLogicalTypes=producedDataType.getLogicalType().getChildren(); + physicalArity=childrenLogicalTypes.size()-metadataArity; + } + // if there was no data, create an empty producedRow + if (producedRow == null) { + producedRow = new GenericRowData(RowKind.INSERT, physicalArity + metadataArity); + } + // add any metadata + if (producedDataType != null ) { + for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) { + producedRow.setField( + physicalArity + metadataPos, + metadataConverters[metadataPos].read(httpRowDataWrapper)); + } + } + outputList.add(producedRow); + return outputList; } } + diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index 956b2f02..df2c3992 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -29,6 +29,7 @@ import org.apache.flink.util.StringUtils; import com.getindata.connectors.http.HttpPostRequestCallback; +import com.getindata.connectors.http.HttpStatusCodeValidationFailedException; import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.PollingClient; import com.getindata.connectors.http.internal.retry.HttpClientWithRetry; @@ -37,17 +38,19 @@ import com.getindata.connectors.http.internal.status.HttpResponseChecker; import com.getindata.connectors.http.internal.utils.HttpHeaderUtils; import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.RESULT_TYPE; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_CONTINUE_ON_ERROR; import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES; import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_RETRY_CODES; import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_SUCCESS_CODES; import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST; + /** * An implementation of {@link PollingClient} that uses Java 11's {@link HttpClient}. * This implementation supports HTTP traffic only. */ @Slf4j -public class JavaNetHttpPollingClient implements PollingClient { +public class JavaNetHttpPollingClient implements PollingClient { private static final String RESULT_TYPE_SINGLE_VALUE = "single-value"; private static final String RESULT_TYPE_ARRAY = "array"; @@ -59,6 +62,7 @@ public class JavaNetHttpPollingClient implements PollingClient { private final HttpPostRequestCallback httpPostRequestCallback; private final HttpLookupConfig options; private final Set ignoredErrorCodes; + private final boolean continueOnError; public JavaNetHttpPollingClient( HttpClient httpClient, @@ -71,7 +75,6 @@ public JavaNetHttpPollingClient( this.objectMapper = new ObjectMapper(); this.httpPostRequestCallback = options.getHttpPostRequestCallback(); this.options = options; - var config = options.getReadableConfig(); this.ignoredErrorCodes = HttpCodesParser.parse(config.get(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES)); @@ -79,6 +82,7 @@ public JavaNetHttpPollingClient( var successCodes = new HashSet(); successCodes.addAll(HttpCodesParser.parse(config.get(SOURCE_LOOKUP_HTTP_SUCCESS_CODES))); successCodes.addAll(ignoredErrorCodes); + this.continueOnError = config.get(SOURCE_LOOKUP_CONTINUE_ON_ERROR); this.httpClient = HttpClientWithRetry.builder() .httpClient(httpClient) @@ -91,11 +95,13 @@ public void open(FunctionContext context) { httpClient.registerMetrics(context.getMetricGroup()); } - @Override - public Collection pull(RowData lookupRow) { + public HttpRowDataWrapper pull(RowData lookupRow) { if (lookupRow == null) { - return Collections.emptyList(); + return HttpRowDataWrapper.builder() + .data(Collections.emptyList()) + .httpCompletionState(HttpCompletionState.SUCCESS) + .build(); } try { log.debug("Collection pull with Rowdata={}.", lookupRow); @@ -105,13 +111,42 @@ public Collection pull(RowData lookupRow) { } } - private Collection queryAndProcess(RowData lookupData) throws Exception { + private HttpRowDataWrapper queryAndProcess(RowData lookupData) throws Exception { var request = requestFactory.buildLookupRequest(lookupData); var oidcProcessor = HttpHeaderUtils.createOIDCHeaderPreprocessor(options.getReadableConfig()); - var response = httpClient.send( - () -> updateHttpRequestIfRequired(request, oidcProcessor), BodyHandlers.ofString()); - return processHttpResponse(response, request); + HttpResponse response =null; + HttpRowDataWrapper httpRowDataWrapper = null; + try { + response = httpClient.send( + () -> updateHttpRequestIfRequired(request, oidcProcessor), BodyHandlers.ofString()); + } catch (HttpStatusCodeValidationFailedException e) { + // Case 1 http non successful response + if (!this.continueOnError) throw e; + // use the response in the Exception + response = (HttpResponse) e.getResponse(); + httpRowDataWrapper = processHttpResponse(response, request, true); + } catch (Exception e) { + // Case 2 Exception occurred + if (!this.continueOnError) throw e; + String errMessage = e.getMessage(); + // some exceptions do not have messages including the java.net.ConnectException we can get here if + // the connection is bad. + if (errMessage == null) { + errMessage = e.toString(); + } + return HttpRowDataWrapper.builder() + .data(Collections.emptyList()) + .errorMessage(errMessage) + .httpCompletionState(HttpCompletionState.EXCEPTION) + .build(); + } + if (httpRowDataWrapper == null) { + // Case 3 Successful path. + httpRowDataWrapper = processHttpResponse(response, request, false); + } + + return httpRowDataWrapper; } /** @@ -161,9 +196,18 @@ protected HttpRequest updateHttpRequestIfRequired(HttpLookupSourceRequestEntry r return httpRequest; } - private Collection processHttpResponse( + /** + * Process the http response. + * @param response http response + * @param request http request + * @param isError whether the http response is an error (i.e. not successful after the retry + * processing and accounting for the config) + * @return HttpRowDataWrapper http row information and http error information + */ + private HttpRowDataWrapper processHttpResponse( HttpResponse response, - HttpLookupSourceRequestEntry request) throws IOException { + HttpLookupSourceRequestEntry request, + boolean isError) throws IOException { this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap()); @@ -171,11 +215,40 @@ private Collection processHttpResponse( log.debug("Received status code [{}] for RestTableSource request with Server response body [{}] ", response.statusCode(), responseBody); - - if (StringUtils.isNullOrWhitespaceOnly(responseBody) || ignoreResponse(response)) { - return Collections.emptyList(); + if (!isError && (StringUtils.isNullOrWhitespaceOnly(responseBody) || ignoreResponse(response))) { + return HttpRowDataWrapper.builder() + .data(Collections.emptyList()) + .httpCompletionState(HttpCompletionState.SUCCESS) + .build(); + } else { + if (isError) { + return HttpRowDataWrapper.builder() + .data(Collections.emptyList()) + .errorMessage(responseBody) + .httpHeadersMap(response.headers().map()) + .httpStatusCode(response.statusCode()) + .httpCompletionState(HttpCompletionState.HTTP_ERROR_STATUS) + .build(); + } else { + Collection rowData = Collections.emptyList(); + HttpCompletionState httpCompletionState= HttpCompletionState.SUCCESS; + String errMessage = null; + try { + rowData = deserialize(responseBody); + } catch (IOException e) { + if (!this.continueOnError) throw e; + httpCompletionState = HttpCompletionState.EXCEPTION; + errMessage = e.getMessage(); + } + return HttpRowDataWrapper.builder() + .data(rowData) + .errorMessage(errMessage) + .httpHeadersMap(response.headers().map()) + .httpStatusCode(response.statusCode()) + .httpCompletionState( httpCompletionState) + .build(); + } } - return deserialize(responseBody); } @VisibleForTesting diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java index 61ffe21f..f80ab9d3 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java @@ -9,7 +9,7 @@ import com.getindata.connectors.http.internal.PollingClientFactory; import com.getindata.connectors.http.internal.utils.JavaNetHttpClientFactory; -public class JavaNetHttpPollingClientFactory implements PollingClientFactory { +public class JavaNetHttpPollingClientFactory implements PollingClientFactory { private final HttpRequestFactory requestFactory; diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/MetadataConverter.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/MetadataConverter.java new file mode 100644 index 00000000..8332d501 --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/MetadataConverter.java @@ -0,0 +1,15 @@ +package com.getindata.connectors.http.internal.table.lookup; +import java.io.Serializable; + +/** + * The metadata converters have a read method that is passed a HttpRowDataWrapper. The implementations + * pick out the appropriate value of the metadata from this object. + */ +interface MetadataConverter extends Serializable { + /** + * + * @param httpRowDataWrapper an object that contains all metadata content + * @return the metadata value for this MetadataConverter. + */ + Object read(HttpRowDataWrapper httpRowDataWrapper); +} diff --git a/src/test/java/com/getindata/connectors/http/internal/retry/HttpClientWithRetryTest.java b/src/test/java/com/getindata/connectors/http/internal/retry/HttpClientWithRetryTest.java index fe3a7254..579fab34 100644 --- a/src/test/java/com/getindata/connectors/http/internal/retry/HttpClientWithRetryTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/retry/HttpClientWithRetryTest.java @@ -26,7 +26,6 @@ import com.getindata.connectors.http.HttpStatusCodeValidationFailedException; import com.getindata.connectors.http.internal.status.HttpResponseChecker; - @SuppressWarnings("unchecked") @ExtendWith(MockitoExtension.class) class HttpClientWithRetryTest { @@ -97,6 +96,7 @@ void shouldFailOnError() throws IOException, InterruptedException { when(responseChecker.isSuccessful(response)).thenReturn(false); when(responseChecker.isTemporalError(response)).thenReturn(false); + assertThrows(HttpStatusCodeValidationFailedException.class, () -> client.send(mock(Supplier.class), mock(HttpResponse.BodyHandler.class))); diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/AsyncHttpTableLookupFunctionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/AsyncHttpTableLookupFunctionTest.java index fb627480..c1b7a189 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/AsyncHttpTableLookupFunctionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/AsyncHttpTableLookupFunctionTest.java @@ -79,7 +79,7 @@ void shouldEvaluateInAsyncWay() throws InterruptedException { assertThat(latch.await(3, TimeUnit.SECONDS)) .withFailMessage( "Future complete in AsyncHttpTableLookupFunction was not called" - + " for at lest one event.") + + " for at least one event.") .isEqualTo(true); assertThat(result.size()).isEqualTo(rowKeys.length); @@ -120,7 +120,7 @@ void shouldHandleExceptionOnOneThread() throws InterruptedException { assertThat(latch.await(3, TimeUnit.SECONDS)) .withFailMessage( "Future complete in AsyncHttpTableLookupFunction was not called" - + " for at lest one event.") + + " for at least one event.") .isEqualTo(true); assertThat(wasException).isTrue(); @@ -159,13 +159,13 @@ void shouldHandleEmptyCollectionResult() throws InterruptedException { assertThat(latch.await(3, TimeUnit.SECONDS)) .withFailMessage( "Future complete in AsyncHttpTableLookupFunction was not called" - + " for at lest one event.") + + " for at least one event.") .isEqualTo(true); assertThat(completeCount.get()) .withFailMessage( "Future complete in AsyncHttpTableLookupFunction was not called" - + " for at lest one event.") + + " for at least one event.") .isEqualTo(rowKeys.length); // -1 since one will have one empty result. diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java index aa099087..0a093eda 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java @@ -1,6 +1,8 @@ package com.getindata.connectors.http.internal.table.lookup; import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.List; @@ -17,6 +19,8 @@ import com.github.tomakehurst.wiremock.matching.StringValuePattern; import com.github.tomakehurst.wiremock.stubbing.Scenario; import com.github.tomakehurst.wiremock.stubbing.StubMapping; +import lombok.Builder; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -40,6 +44,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import static com.github.tomakehurst.wiremock.client.WireMock.*; import static org.assertj.core.api.Assertions.assertThat; @@ -640,6 +645,7 @@ void testLookupJoinOnRowWithRowType() throws Exception { "Basic dXNlcjpwYXNzd29yZA==, Basic dXNlcjpwYXNzd29yZA==, true", "Bearer dXNlcjpwYXNzd29yZA==, Bearer dXNlcjpwYXNzd29yZA==, true" }) + void testLookupWithUseRawAuthHeader( String authHeaderRawValue, String expectedAuthHeaderValue, @@ -674,7 +680,9 @@ private void testLookupJoinOnRowWithRowTypeImpl( // For testing the gid.connector.http.source.lookup.use-raw-authorization-header // configuration parameter: expectedAuthHeaderValue != null ? "Authorization" : null, - expectedAuthHeaderValue // expected value of extra header + expectedAuthHeaderValue, // expected value of extra header + null, + false ); String fields = @@ -924,24 +932,7 @@ private LookupCache getCache() { private @NotNull SortedSet testLookupJoin(String lookupTable, int maxRows) throws Exception { - String sourceTable = - "CREATE TABLE Orders (" - + "id STRING," - + " id2 STRING," - + " proc_time AS PROCTIME()" - + ") WITH (" - + "'connector' = 'datagen'," - + "'rows-per-second' = '1'," - + "'fields.id.kind' = 'sequence'," - + "'fields.id.start' = '1'," - + "'fields.id.end' = '" + maxRows + "'," - + "'fields.id2.kind' = 'sequence'," - + "'fields.id2.start' = '2'," - + "'fields.id2.end' = '" + (maxRows + 1) + "'" - + ")"; - - tEnv.executeSql(sourceTable); - tEnv.executeSql(lookupTable); + createLookupAndSourceTables(lookupTable, maxRows); // WHEN // SQL query that performs JOIN on both tables. @@ -958,16 +949,74 @@ private LookupCache getCache() { return getCollectedRows(result); } + private @NotNull SortedSet testLookupJoinWithMetadata(String lookupTable, int maxRows) throws Exception { + + createLookupAndSourceTables(lookupTable, maxRows); + + // WHEN + String joinQuery = + "SELECT o.id, o.id2, c.msg, c.uuid, c.isActive, c.balance, " + + "c.errStr, c.statusCode, c.headers, c.completionState FROM Orders AS o " + + "JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c " + + "ON o.id = c.id " + + "AND o.id2 = c.id2"; + + TableResult result = tEnv.executeSql(joinQuery); + + // THEN + return getCollectedRows(result); + } + + private void createLookupAndSourceTables(String lookupTable, int maxRows) { + String sourceTable = + "CREATE TABLE Orders (" + + "id STRING," + + " id2 STRING," + + " proc_time AS PROCTIME()" + + ") WITH (" + + "'connector' = 'datagen'," + + "'rows-per-second' = '1'," + + "'fields.id.kind' = 'sequence'," + + "'fields.id.start' = '1'," + + "'fields.id.end' = '" + maxRows + "'," + + "'fields.id2.kind' = 'sequence'," + + "'fields.id2.start' = '2'," + + "'fields.id2.end' = '" + (maxRows + 1) + "'" + + ")"; + + tEnv.executeSql(sourceTable); + tEnv.executeSql(lookupTable); + } + + private void assertResultsForSpec(TestSpec spec, Collection rows) { + if (spec.badStatus) { + assertEnrichedRowsNoDataBadStatus(rows); + } else if (spec.deserError) { + assertEnrichedRowsDeserException(rows); + } else if (spec.connectionError) { + assertEnrichedRowsException(rows); + } else if (spec.useMetadata) { + assertEnrichedRows(rows, true); + } else { + assertEnrichedRows(rows); + } + } + private void assertEnrichedRows(Collection collectedRows) { + assertEnrichedRows(collectedRows, false); + } + + private void assertEnrichedRows(Collection collectedRows, boolean withMetadata) { + + final int rowArity = withMetadata? 10 : 6; // validate every row and its column. assertAll(() -> { assertThat(collectedRows.size()).isEqualTo(4); int intElement = 0; for (Row row : collectedRows) { intElement++; - assertThat(row.getArity()).isEqualTo(6); - - // "id" nad "id2" columns should be different for every row. + assertThat(row.getArity()).isEqualTo(rowArity); + // "id" and "id2" columns should be different for every row. assertThat(row.getField("id")).isEqualTo(String.valueOf(intElement)); assertThat(row.getField("id2")).isEqualTo(String.valueOf(intElement + 1)); @@ -975,11 +1024,98 @@ private void assertEnrichedRows(Collection collectedRows) { .isEqualTo("fbb68a46-80a9-46da-9d40-314b5287079c"); assertThat(row.getField("isActive")).isEqualTo(true); assertThat(row.getField("balance")).isEqualTo("$1,729.34"); + if (withMetadata) { + assertThat(row.getField("errStr")).isNull(); + assertThat(row.getField("headers")).isNotNull(); + assertThat(row.getField("statusCode")).isEqualTo(200); + assertThat(row.getField("completionState")).isEqualTo(HttpCompletionState.SUCCESS.name()); + } } } ); } + private void assertEnrichedRowsNoDataBadStatus(Collection collectedRows ) { + + final int rowArity = 10; + // validate every row and its column. + + assertAll(() -> { + assertThat(collectedRows.size()).isEqualTo(4); + int intElement = 0; + for (Row row : collectedRows) { + intElement++; + assertThat(row.getArity()).isEqualTo(rowArity); + // "id" and "id2" columns should be different for every row. + assertThat(row.getField("id")).isEqualTo(String.valueOf(intElement)); + assertThat(row.getField("id2")).isEqualTo(String.valueOf(intElement + 1)); + assertThat(row.getField("uuid")).isNull(); + assertThat(row.getField("isActive")).isNull(); + assertThat(row.getField("balance")).isNull(); + // metadata + assertThat(row.getField("errStr")).isEqualTo(""); + assertThat(row.getField("headers")).isNotNull(); + assertThat(row.getField("statusCode")).isEqualTo(500); + assertEquals(row.getField("completionState"), HttpCompletionState.HTTP_ERROR_STATUS.name()); + } + } + ); + } + + private void assertEnrichedRowsDeserException(Collection collectedRows ) { + + final int rowArity = 10; + // validate every row and its column. + + assertAll(() -> { + assertThat(collectedRows.size()).isEqualTo(4); + int intElement = 0; + for (Row row : collectedRows) { + intElement++; + assertThat(row.getArity()).isEqualTo(rowArity); + // "id" and "id2" columns should be different for every row. + assertThat(row.getField("id")).isEqualTo(String.valueOf(intElement)); + assertThat(row.getField("id2")).isEqualTo(String.valueOf(intElement + 1)); + assertThat(row.getField("uuid")).isNull(); + assertThat(row.getField("isActive")).isNull(); + assertThat(row.getField("balance")).isNull(); + // metadata + assertThat(row.getField("errStr")) + .isEqualTo("Failed to deserialize JSON 'A test string that is not json'."); + assertThat(row.getField("headers")).isNotNull(); + assertThat(row.getField("statusCode")).isEqualTo(200); + assertEquals(row.getField("completionState"), HttpCompletionState.EXCEPTION.name()); + } + } + ); + } + + private void assertEnrichedRowsException(Collection collectedRows ) { + + final int rowArity = 10; + // validate every row and its column. + + assertAll(() -> { + assertThat(collectedRows.size()).isEqualTo(4); + int intElement = 0; + for (Row row : collectedRows) { + intElement++; + assertThat(row.getArity()).isEqualTo(rowArity); + // "id" and "id2" columns should be different for every row. + assertThat(row.getField("id")).isEqualTo(String.valueOf(intElement)); + assertThat(row.getField("id2")).isEqualTo(String.valueOf(intElement + 1)); + assertThat(row.getField("uuid")).isNull(); + assertThat(row.getField("isActive")).isNull(); + assertThat(row.getField("balance")).isNull(); + // metadata + assertThat(row.getField("errStr")).isNotNull(); + assertThat(row.getField("headers")).isNull(); + assertThat(row.getField("statusCode")).isNull(); + assertEquals(row.getField("completionState"), HttpCompletionState.EXCEPTION.name()); + } + }); + } + @NotNull private SortedSet getCollectedRows(TableResult result) throws Exception { @@ -1029,15 +1165,33 @@ private void setUpServerBodyStub( String methodName, WireMockServer wireMockServer, List matchingJsonPaths) { - setUpServerBodyStub(methodName, wireMockServer, matchingJsonPaths, null, null); + setUpServerBodyStub(methodName, wireMockServer, matchingJsonPaths, null, null, null, false); + } + + private void setUpServerBodyStub( + String methodName, + WireMockServer wireMockServer, + List matchingJsonPaths, + Integer badStatus) { + setUpServerBodyStub(methodName, wireMockServer, matchingJsonPaths, null, null, badStatus, false); + } + + private void setUpServerBodyStub( + String methodName, + WireMockServer wireMockServer, + List matchingJsonPaths, boolean isDeserErr) { + setUpServerBodyStub(methodName, wireMockServer, matchingJsonPaths, null, null, null, isDeserErr); } + private void setUpServerBodyStub( String methodName, WireMockServer wireMockServer, List matchingJsonPaths, String extraHeader, - String expectedExtraHeaderValue) { + String expectedExtraHeaderValue, + Integer badStatus, + boolean isDeserErr) { MappingBuilder methodStub = (methodName.equalsIgnoreCase("PUT") ? put(urlEqualTo(ENDPOINT)) : @@ -1058,14 +1212,256 @@ private void setUpServerBodyStub( for (StringValuePattern pattern : matchingJsonPaths) { methodStub.withRequestBody(pattern); } - - methodStub - .willReturn( - aResponse() - .withTransformers(JsonTransform.NAME)); + if (badStatus == null) { + if (isDeserErr) { + methodStub.willReturn( + aResponse().withBody("A test string that is not json").withStatus(200)); + } else { + methodStub + .willReturn( + aResponse() + .withTransformers(JsonTransform.NAME)); + } + } else { + methodStub + .willReturn( + aResponse().withBody(new byte[0]).withStatus(500)); + } StubMapping stubMapping = wireMockServer.stubFor(methodStub); wireMockServer.addStubMapping(stubMapping); } + + // Prototype parameterizedTest + @ParameterizedTest + @MethodSource("testSpecProvider") + void testHttpLookupJoinParameterized(TestSpec spec) throws Exception { + // GIVEN + setupServerStubForSpec(spec); + + // Create lookup table SQL + String lookupTable = createLookupTableSql(spec); + + // WHEN + SortedSet rows =null; + boolean expectToContinue = spec.continueOnError && (spec.connectionError || spec.deserError || spec.badStatus); + try { + if (spec.useMetadata) { + rows = testLookupJoinWithMetadata(lookupTable, spec.maxRows); + } else { + rows = testLookupJoin(lookupTable, spec.maxRows); + } + // THEN + assertResultsForSpec(spec, rows); + } catch (Exception e) { + assertThat(expectToContinue).isFalse(); + } + } + + static Collection testSpecProvider() { + List specs = new ArrayList<>(); + + // Basic test cases (testHttpLookupJoin) + for (String method : Arrays.asList("GET", "POST", "PUT")) { + for (boolean asyncFlag : Arrays.asList(false, true)) { + for (boolean continueOnError : Arrays.asList(false, true)) { + specs.add(TestSpec.builder() + .testName("Basic HTTP Lookup Join") + .methodName(method) + .maxRows(4) + .useAsync(asyncFlag) + .continueOnError(continueOnError) + .build()); + } + } + } + + // Metadata success test cases (testHttpLookupJoinWithMetadataSuccess) + for (String method : Arrays.asList("GET", "POST", "PUT")) { + for (boolean asyncFlag : Arrays.asList(false, true)) { + for (boolean continueOnError : Arrays.asList(false, true)) { + specs.add(TestSpec.builder() + .methodName(method) + .testName("HTTP Lookup Join With Metadata Success") + .useMetadata(true) + .maxRows(4) + .useAsync(asyncFlag) + .continueOnError(continueOnError) + .build()); + } + } + } + + // Bad status test cases (testHttpLookupJoinWithMetadataBadStatus) + for (String method : Arrays.asList("GET", "POST", "PUT")) { + for (boolean asyncFlag : Arrays.asList(false, true)) { + for (boolean continueOnError : Arrays.asList(false, true)) { + specs.add(TestSpec.builder() + .testName("HTTP Lookup Join With Metadata Bad Status") + .methodName(method) + .useMetadata(true) + .maxRows(4) + .useAsync(asyncFlag) + .badStatus(true) + .continueOnError(continueOnError) + .build()); + } + } + } + + // Deserialization error test cases (testHttpLookupJoinWithMetadataDeserException) + for (String method : Arrays.asList("GET", "POST", "PUT")) { + for (boolean asyncFlag : Arrays.asList(false, true)) { + for (boolean continueOnError : Arrays.asList(false, true)) { + specs.add(TestSpec.builder() + .testName("HTTP Lookup Join With Metadata Deserialization Error") + .methodName(method) + .useMetadata(true) + .maxRows(4) + .useAsync(asyncFlag) + .deserError(true) + .continueOnError(continueOnError) + .build() + ); + } + } + } + + // Connection error test cases (testHttpLookupJoinWithMetadataException) + for (String method : Arrays.asList("GET", "POST", "PUT")) { + for (boolean asyncFlag : Arrays.asList(false, true)) { + for (boolean continueOnError : Arrays.asList(false, true)) { + specs.add(TestSpec.builder() + .testName("HTTP Lookup Join With Metadata Connection Error") + .methodName(method) + .useMetadata(true) + .maxRows(4) + .useAsync(asyncFlag) + .connectionError(true) + .continueOnError(continueOnError) + .build() + ); + } + } + } + + return specs; + } + @Builder + @Data + private static class TestSpec { + // Test identification + final String testName; + final String methodName; + + // Server stub configuration + final boolean useMetadata; + final boolean badStatus; + final boolean deserError; + final boolean connectionError; + + // Test execution configuration + final int maxRows; + final boolean useAsync; + final boolean continueOnError; + + @Override + public String toString() { + return testName + " [" + methodName + "]"; + } + } + + private void setupServerStubForSpec(TestSpec spec) { + if (spec.badStatus) { + // Setup for bad status test + if (StringUtils.isNullOrWhitespaceOnly(spec.methodName) || spec.methodName.equalsIgnoreCase("GET")) { + wireMockServer.stubFor(get(urlPathEqualTo(ENDPOINT)) + .withHeader("Content-Type", equalTo("application/json")) + .willReturn(aResponse().withBody(new byte[0]).withStatus(500)) + ); + } else { + setUpServerBodyStub( + spec.methodName, + wireMockServer, + List.of(matchingJsonPath("$.id"), matchingJsonPath("$.id2")), + Integer.valueOf(500) + ); + } + } else if (spec.deserError) { + // Setup for deserialization error test + if (StringUtils.isNullOrWhitespaceOnly(spec.methodName) || spec.methodName.equalsIgnoreCase("GET")) { + wireMockServer.stubFor(get(urlPathEqualTo(ENDPOINT)) + .withHeader("Content-Type", equalTo("application/json")) + .willReturn(aResponse().withBody("A test string that is not json").withStatus(200)) + ); + } else { + setUpServerBodyStub( + spec.methodName, + wireMockServer, + List.of(matchingJsonPath("$.id"), matchingJsonPath("$.id2")), + true + ); + } + } else if (spec.connectionError) { + // No need to set up server stub for connection error test + // The test will use a non-existent port (9091) + } else { + // Setup for success test + if (StringUtils.isNullOrWhitespaceOnly(spec.methodName) || spec.methodName.equalsIgnoreCase("GET")) { + setupServerStub(wireMockServer); + } else { + setUpServerBodyStub( + spec.methodName, + wireMockServer, + List.of(matchingJsonPath("$.id"), matchingJsonPath("$.id2")) + ); + } + } + } + + private String createLookupTableSql(TestSpec spec) { + StringBuilder sql = new StringBuilder(); + sql.append("CREATE TABLE Customers (") + .append("id STRING,") + .append("id2 STRING,") + .append("msg STRING,") + .append("uuid STRING,") + .append("details ROW<") + .append("isActive BOOLEAN,") + .append("nestedDetails ROW<") + .append("balance STRING") + .append(">") + .append(">"); + + if (spec.useMetadata) { + sql.append(",") + .append("errStr STRING METADATA FROM 'error-string',") + .append("statusCode INTEGER METADATA FROM 'http-status-code',") + .append("headers MAP> METADATA from 'http-headers',") + .append("completionState STRING METADATA from 'http-completion-state'"); + } + + sql.append(") WITH (") + .append("'format' = 'json',") + .append("'connector' = 'rest-lookup',"); + + if (!StringUtils.isNullOrWhitespaceOnly(spec.methodName)) { + sql.append("'lookup-method' = '").append(spec.methodName).append("',"); + } + + // URL with correct port for connection error test + if (spec.connectionError) { + sql.append("'url' = 'http://localhost:9091/client',"); + } else { + sql.append("'url' = 'http://localhost:9090/client',"); + } + sql.append("'gid.connector.http.source.lookup.header.Content-Type' = 'application/json',"); + sql.append("'gid.connector.http.source.lookup.continue-on-error'='true',"); + sql.append("'asyncPolling' = '").append(spec.useAsync ? "true" : "false").append("',") + .append("'table.exec.async-lookup.buffer-capacity' = '50',") + .append("'table.exec.async-lookup.timeout' = '120s'") + .append(")"); + return sql.toString(); + } } diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceTest.java index 86e81e32..d4684cd4 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceTest.java @@ -1,11 +1,6 @@ package com.getindata.connectors.http.internal.table.lookup; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.metrics.groups.CacheMetricGroup; @@ -19,9 +14,14 @@ import org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider; import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider; import org.apache.flink.table.connector.source.lookup.cache.LookupCache; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; import org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext; +import org.apache.flink.table.types.AtomicDataType; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.IntType; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; @@ -33,31 +33,34 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSource.ReadableMetadata.ERROR_STRING; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSource.ReadableMetadata.HTTP_COMPLETION_STATE; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSource.ReadableMetadata.HTTP_HEADERS; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSource.ReadableMetadata.HTTP_STATUS_CODE; import static com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSourceFactory.row; - class HttpLookupTableSourceTest { public static final DataType PHYSICAL_ROW_DATA_TYPE = - row(List.of(DataTypes.FIELD("id", DataTypes.STRING().notNull()))); + row(List.of(DataTypes.FIELD("id", DataTypes.STRING().notNull()))); private static final ResolvedSchema SCHEMA = - new ResolvedSchema( - Arrays.asList( - Column.physical("id", DataTypes.STRING().notNull()), - Column.physical("msg", DataTypes.STRING().notNull()), - Column.physical("uuid", DataTypes.STRING().notNull()), - Column.physical("details", DataTypes.ROW( - DataTypes.FIELD("isActive", DataTypes.BOOLEAN()), - DataTypes.FIELD("nestedDetails", DataTypes.ROW( - DataTypes.FIELD("balance", DataTypes.STRING()) - ) - ) - ).notNull()) - ), - Collections.emptyList(), - UniqueConstraint.primaryKey("id", List.of("id")) - ); + new ResolvedSchema( + Arrays.asList( + Column.physical("id", DataTypes.STRING().notNull()), + Column.physical("msg", DataTypes.STRING().notNull()), + Column.physical("uuid", DataTypes.STRING().notNull()), + Column.physical("details", DataTypes.ROW( + DataTypes.FIELD("isActive", DataTypes.BOOLEAN()), + DataTypes.FIELD("nestedDetails", DataTypes.ROW( + DataTypes.FIELD("balance", DataTypes.STRING()) + ) + ) + ).notNull()) + ), + Collections.emptyList(), + UniqueConstraint.primaryKey("id", List.of("id")) + ); // lookupKey index {{0}} means first column. private final int[][] lookupKey = {{0}}; @@ -67,40 +70,128 @@ public void setUp() { LookupRow expectedLookupRow = new LookupRow(); expectedLookupRow.addLookupEntry( - new RowDataSingleValueLookupSchemaEntry( - "id", - RowData.createFieldGetter(DataTypes.STRING().notNull().getLogicalType(), 0) - ) + new RowDataSingleValueLookupSchemaEntry( + "id", + RowData.createFieldGetter(DataTypes.STRING().notNull().getLogicalType(), 0) + ) ); expectedLookupRow.setLookupPhysicalRowDataType(PHYSICAL_ROW_DATA_TYPE); } + @Test + void testListReadableMetadata() { + HttpLookupTableSource tableSource = + (HttpLookupTableSource) createTableSource(SCHEMA, getOptions()); + Map listMetadataMap = tableSource.listReadableMetadata(); + Map expectedMap = new LinkedHashMap<>(); + expectedMap.put(HTTP_STATUS_CODE.key, new AtomicDataType(new IntType(true))); + expectedMap.put(HTTP_HEADERS.key, DataTypes.MAP(DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING()))); + expectedMap.put(ERROR_STRING.key, DataTypes.STRING()); + expectedMap.put(HTTP_COMPLETION_STATE.key, DataTypes.STRING()); + + assertThat(listMetadataMap).isEqualTo(expectedMap); + } + + @Test + void testsummaryString() { + HttpLookupTableSource tableSource = + (HttpLookupTableSource) createTableSource(SCHEMA, getOptions()); + assertThat(tableSource.asSummaryString()).isEqualTo("Http Lookup Table Source"); + } + + @Test + void testreadReadableMetadata() { + HttpLookupTableSource tableSource = + (HttpLookupTableSource) createTableSource(SCHEMA, getOptions()); + final String testErrorString = "ABC"; + final int testStatusCode = 500; + final HttpCompletionState testCompletionState = HttpCompletionState.HTTP_ERROR_STATUS; + Map> testHeaders = new HashMap<>(); + testHeaders.put("AAA",List.of("BBB","CCC")); + testHeaders.put("DDD",List.of("EEE")); + HttpRowDataWrapper httpRowDataWrapper = HttpRowDataWrapper.builder() + .errorMessage(testErrorString) + .httpStatusCode(500) + .httpHeadersMap(testHeaders) + .httpCompletionState( testCompletionState) + .build(); + assertThat(ERROR_STRING.converter.read(httpRowDataWrapper)) + .isEqualTo(StringData.fromString(testErrorString)); + assertThat(ERROR_STRING.converter.read(null)) + .isNull(); + assertThat(HTTP_STATUS_CODE.converter.read(httpRowDataWrapper)) + .isEqualTo(Integer.valueOf(testStatusCode)); + assertThat(HTTP_STATUS_CODE.converter.read( null)) + .isNull(); + Object readResultForHeaders = HTTP_HEADERS.converter.read(httpRowDataWrapper); + assertThat(HTTP_HEADERS.converter.read( null)) + .isNull(); + assertThat(readResultForHeaders).isInstanceOf(GenericMapData.class); + GenericMapData mapData = (GenericMapData) readResultForHeaders; + + // Verify the map has the expected keys + ArrayData keys = mapData.keyArray(); + assertThat(keys.size()).isEqualTo(2); + + // Create a map to store the converted data for comparison + Map> actualMap = convertGenericMapDataToMap(mapData, keys); + // Now compare the extracted map with the expected map + assertThat(actualMap).isEqualTo(testHeaders); + + assertThat(HTTP_COMPLETION_STATE.converter.read(null)).isNull(); + + assertThat(HTTP_COMPLETION_STATE.converter.read( httpRowDataWrapper)) + .isEqualTo(StringData.fromString(testCompletionState.name())); + } + + private static @NotNull Map> + convertGenericMapDataToMap(GenericMapData genericMapData, ArrayData keys) { + Map> map = new HashMap<>(); + ArrayData valueArray = genericMapData.valueArray(); + // Extract and convert each key-value pair + for (int i = 0; i < keys.size(); i++) { + ArrayData values = valueArray.getArray(i); + StringData key = keys.getString(i); + String keyStr = key.toString(); + List valueList = new ArrayList<>(); + + // Extract each string from the array + for (int j = 0; j < values.size(); j++) { + StringData element = values.getString(j); + valueList.add(element.toString()); + } + + map.put(keyStr, valueList); + } + return map; + } + @Test @SuppressWarnings("unchecked") void shouldCreateTableSourceWithParams() { HttpLookupTableSource tableSource = - (HttpLookupTableSource) createTableSource(SCHEMA, getOptions()); + (HttpLookupTableSource) createTableSource(SCHEMA, getOptions()); LookupTableSource.LookupRuntimeProvider lookupProvider = - tableSource.getLookupRuntimeProvider(new LookupRuntimeProviderContext(lookupKey)); + tableSource.getLookupRuntimeProvider(new LookupRuntimeProviderContext(lookupKey)); HttpTableLookupFunction tableFunction = (HttpTableLookupFunction) - ((LookupFunctionProvider) lookupProvider).createLookupFunction(); + ((LookupFunctionProvider) lookupProvider).createLookupFunction(); LookupRow actualLookupRow = tableFunction.getLookupRow(); assertThat(actualLookupRow).isNotNull(); assertThat(actualLookupRow.getLookupEntries()).isNotEmpty(); assertThat(actualLookupRow.getLookupPhysicalRowDataType()) - .isEqualTo(PHYSICAL_ROW_DATA_TYPE); + .isEqualTo(PHYSICAL_ROW_DATA_TYPE); HttpLookupConfig actualLookupConfig = tableFunction.getOptions(); assertThat(actualLookupConfig).isNotNull(); assertThat( - actualLookupConfig.getReadableConfig().get( - ConfigOptions.key("connector").stringType().noDefaultValue()) + actualLookupConfig.getReadableConfig().get( + ConfigOptions.key("connector").stringType().noDefaultValue()) ) - .withFailMessage( - "Readable config probably was not passed from Table Factory or it is empty.") - .isNotNull(); + .withFailMessage( + "Readable config probably was not passed from Table Factory or it is empty.") + .isNotNull(); } @Test @@ -109,32 +200,32 @@ void shouldCreateAsyncTableSourceWithParams() { Map options = getOptionsWithAsync(); HttpLookupTableSource tableSource = - (HttpLookupTableSource) createTableSource(SCHEMA, options); + (HttpLookupTableSource) createTableSource(SCHEMA, options); AsyncLookupFunctionProvider lookupProvider = - (AsyncLookupFunctionProvider) - tableSource.getLookupRuntimeProvider( - new LookupRuntimeProviderContext(lookupKey)); + (AsyncLookupFunctionProvider) + tableSource.getLookupRuntimeProvider( + new LookupRuntimeProviderContext(lookupKey)); AsyncHttpTableLookupFunction tableFunction = - (AsyncHttpTableLookupFunction) lookupProvider.createAsyncLookupFunction(); + (AsyncHttpTableLookupFunction) lookupProvider.createAsyncLookupFunction(); LookupRow actualLookupRow = tableFunction.getLookupRow(); assertThat(actualLookupRow).isNotNull(); assertThat(actualLookupRow.getLookupEntries()).isNotEmpty(); assertThat(actualLookupRow.getLookupPhysicalRowDataType()) - .isEqualTo(PHYSICAL_ROW_DATA_TYPE); + .isEqualTo(PHYSICAL_ROW_DATA_TYPE); HttpLookupConfig actualLookupConfig = tableFunction.getOptions(); assertThat(actualLookupConfig).isNotNull(); assertThat(actualLookupConfig.isUseAsync()).isTrue(); assertThat( - actualLookupConfig.getReadableConfig().get(HttpLookupConnectorOptions.ASYNC_POLLING) + actualLookupConfig.getReadableConfig().get(HttpLookupConnectorOptions.ASYNC_POLLING) ) - .withFailMessage( - "Readable config probably was not passed" + - " from Table Factory or it is empty.") - .isTrue(); + .withFailMessage( + "Readable config probably was not passed" + + " from Table Factory or it is empty.") + .isTrue(); } @ParameterizedTest @@ -190,8 +281,8 @@ private static class TestSpec { Class expected; private TestSpec(boolean hasCache, - boolean isAsync, - Class expected) { + boolean isAsync, + Class expected) { this.hasCache = hasCache; this.isAsync = isAsync; this.expected = expected; @@ -246,8 +337,8 @@ private Map getOptionsWithAsync() { private Map getOptions() { return Map.of( - "connector", "rest-lookup", - "url", "http://localhost:8080/service", - "format", "json"); + "connector", "rest-lookup", + "url", "http://localhost:8080/service", + "format", "json"); } } diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpRowDataWrapperTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpRowDataWrapperTest.java new file mode 100644 index 00000000..dbc25968 --- /dev/null +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpRowDataWrapperTest.java @@ -0,0 +1,47 @@ +package com.getindata.connectors.http.internal.table.lookup; + +import java.util.Collections; +import java.util.HashMap; + +import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Assertions.assertThat; + +public class HttpRowDataWrapperTest { + + @Test + void testshouldIgnore() { + HttpRowDataWrapper httpRowDataWrapper = HttpRowDataWrapper.builder() + .data(Collections.emptyList()) + .httpCompletionState(HttpCompletionState.SUCCESS) + .build(); + assertThat(httpRowDataWrapper.shouldIgnore()).isTrue(); + httpRowDataWrapper = HttpRowDataWrapper.builder() + .errorMessage("aa") + .httpCompletionState(HttpCompletionState.SUCCESS) + .build(); + assertThat(httpRowDataWrapper.shouldIgnore()).isFalse(); + httpRowDataWrapper = HttpRowDataWrapper.builder() + .data(Collections.emptyList()) + .errorMessage("aa") + .httpCompletionState(HttpCompletionState.SUCCESS) + .build(); + assertThat(httpRowDataWrapper.shouldIgnore()).isFalse(); + httpRowDataWrapper = HttpRowDataWrapper.builder() + .data(Collections.emptyList()) + .httpHeadersMap(new HashMap<>()) + .httpCompletionState(HttpCompletionState.SUCCESS) + .build(); + assertThat(httpRowDataWrapper.shouldIgnore()).isFalse(); + httpRowDataWrapper = HttpRowDataWrapper.builder() + .data(Collections.emptyList()) + .httpStatusCode(123) + .httpCompletionState(HttpCompletionState.SUCCESS) + .build(); + assertThat(httpRowDataWrapper.shouldIgnore()).isFalse(); + httpRowDataWrapper = HttpRowDataWrapper.builder() + .data(Collections.emptyList()) + .httpCompletionState(HttpCompletionState.EXCEPTION) + .build(); + assertThat(httpRowDataWrapper.shouldIgnore()).isFalse(); + } +} diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java index 59fbb265..a4b0293f 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java @@ -141,7 +141,7 @@ void shouldQuery200WithParams() throws ConfigurationException { JavaNetHttpPollingClient pollingClient = setUpPollingClient(); // WHEN - Collection results = pollingClient.pull(lookupRowData); + Collection results = pollingClient.pull(lookupRowData).getData(); // THEN wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest())); @@ -168,7 +168,7 @@ void shouldQuery200WithBodyParams(String methodName) throws ConfigurationExcepti JavaNetHttpPollingClient pollingClient = setUpPollingClient(setUpBodyRequestFactory(methodName)); // WHEN - Collection results = pollingClient.pull(lookupRowData); + Collection results = pollingClient.pull(lookupRowData).getData(); // THEN wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest())); @@ -212,7 +212,7 @@ void shouldQuery200WithArrayResult() throws ConfigurationException { JavaNetHttpPollingClient pollingClient = setUpPollingClient(); // WHEN - Collection results = pollingClient.pull(lookupRowData); + Collection results = pollingClient.pull(lookupRowData).getData(); // THEN wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest())); @@ -246,7 +246,7 @@ void shouldQuery200WithArrayResultWithNulls() throws ConfigurationException { JavaNetHttpPollingClient pollingClient = setUpPollingClient(); // WHEN - Collection results = pollingClient.pull(lookupRowData); + Collection results = pollingClient.pull(lookupRowData).getData(); // THEN wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest())); @@ -278,7 +278,7 @@ void shouldHandleCodeBasedOnConfiguration( JavaNetHttpPollingClient pollingClient = setUpPollingClient(); // WHEN - Collection results = pollingClient.pull(lookupRowData); + Collection results = pollingClient.pull(lookupRowData).getData(); // THEN assertThat(results.isEmpty()).isEqualTo(isExpectedResponseEmpty); @@ -302,7 +302,7 @@ void shouldProcessWithMissingArguments() throws ConfigurationException { JavaNetHttpPollingClient pollingClient = setUpPollingClient(); // WHEN - Collection results = pollingClient.pull(null); + Collection results = pollingClient.pull(null).getData(); // THEN assertThat(results.isEmpty()).isTrue(); @@ -333,7 +333,7 @@ public void shouldConnectWithBasicAuth(String authorizationHeaderValue, JavaNetHttpPollingClient pollingClient = setUpPollingClient(); // WHEN - Collection results = pollingClient.pull(lookupRowData); + Collection results = pollingClient.pull(lookupRowData).getData(); // THEN wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest())); diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java index 69fb7fd5..3f88f936 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java @@ -263,7 +263,7 @@ public void shouldThrowOnInvalidPath( private void testPollingClientConnection() throws ConfigurationException { JavaNetHttpPollingClient pollingClient = setUpPollingClient(properties); - Collection result = pollingClient.pull(lookupRowData); + Collection result = pollingClient.pull(lookupRowData).getData(); assertResult(result); }