diff --git a/.gitignore b/.gitignore
index 4c816b7f..637d5966 100644
--- a/.gitignore
+++ b/.gitignore
@@ -13,3 +13,4 @@ bin
/src/test/test.iml
/flink-http-connector.iml
/dependency-reduced-pom.xml
+/.java-version
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a8931570..c7fbc729 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,11 @@
## [Unreleased]
+- Retries support for source table:
+ - Auto retry on IOException and user-defined http codes - parameter `gid.connector.http.source.lookup.retry-codes`.
+ - Parameters `gid.connector.http.source.lookup.error.code.exclude"` and `gid.connector.http.source.lookup.error.code` were replaced by `gid.connector.http.source.lookup.ignored-response-codes`.
+ - Added connection timeout for source table - `gid.connector.http.source.lookup.connection.timeout`.
+
## [0.19.0] - 2025-03-20
- OIDC token request to not flow during explain
diff --git a/README.md b/README.md
index a03196d7..2899d31a 100644
--- a/README.md
+++ b/README.md
@@ -170,8 +170,22 @@ The second one is set per individual HTTP requests by HTTP client. Its default v
Flink's current implementation of `AsyncTableFunction` does not allow specifying custom logic for handling Flink AsyncIO timeouts as it is for Java API.
Because of that, if AsyncIO timer passes, Flink will throw TimeoutException which will cause job restart.
-The HTTP request timeouts on the other hand will not cause Job restart. In that case, exception will be logged into application logs.
-To avoid job restart on timeouts caused by Lookup queries, the value of `gid.connector.http.source.lookup.request.timeout` should be smaller than `table.exec.async-lookup.timeout`.
+#### Retries (Lookup source)
+Lookup source handles auto-retries for two scenarios:
+1. IOException occurs (e.g. temporary network outage)
+2. The response contains a HTTP error code that indicates a retriable error. These codes are defined in the table configuration (see `gid.connector.http.source.lookup.retry-codes`).
+Retries are executed silently, without restarting the job. After reaching max retries attempts (per request) operation will fail and restart job.
+
+Notice that HTTP codes are categorized into into 3 groups:
+- successful responses - response is returned immediately for further processing
+- temporary errors - request will be retried up to the retry limit
+- error responses - unexpected responses are not retried and will fail the job. Any HTTP error code which is not configured as successful or temporary error is treated as an unretriable error.
+
+##### Retry strategy
+User can choose retry strategy type for source table:
+- fixed-delay - http request will be re-sent after specified delay.
+- exponential-delay - request will be re-sent with exponential backoff strategy, limited by `lookup.max-retries` attempts. The delay for each retry is calculated as the previous attempt's delay multiplied by the backoff multiplier (parameter `gid.connector.http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier`) up to `gid.connector.http.source.lookup.retry-strategy.exponential-delay.max-backoff`. The initial delay value is defined in the table configuration as `gid.connector.http.source.lookup.retry-strategy.exponential-delay.initial-backoff`.
+
#### Lookup multiple results
@@ -391,19 +405,65 @@ is provided.
## HTTP status code handler
-Http Sink and Lookup Source connectors allow defining list of HTTP status codes that should be treated as errors.
-By default all 400s and 500s response codes will be interpreted as error code.
+### Sink table
+You can configure a list of HTTP status codes that should be treated as errors for HTTP sink table.
+By default all 400 and 500 response codes will be interpreted as error code.
-This behavior can be changed by using below properties in table definition (DDL) for Sink and Lookup Source or passing it via
-`setProperty' method from Sink's builder. The property names are:
-- `gid.connector.http.sink.error.code` and `gid.connector.http.source.lookup.error.code` used to defined HTTP status code value that should be treated as error for example 404.
+This behavior can be changed by using below properties in table definition (DDL) or passing it via `setProperty' method from Sink's builder. The property name are:
+- `gid.connector.http.sink.error.code` used to defined HTTP status code value that should be treated as error for example 404.
Many status codes can be defined in one value, where each code should be separated with comma, for example:
`401, 402, 403`. User can use this property also to define a type code mask. In that case, all codes from given HTTP response type will be treated as errors.
An example of such a mask would be `3XX, 4XX, 5XX`. In this case, all 300s, 400s and 500s status codes will be treated as errors.
-- `gid.connector.http.sink.error.code.exclude` and `gid.connector.http.source.lookup.error.code.exclude` used to exclude a HTTP code from error list.
+- `gid.connector.http.sink.error.code.exclude` used to exclude a HTTP code from error list.
Many status codes can be defined in one value, where each code should be separated with comma, for example:
`401, 402, 403`. In this example, codes 401, 402 and 403 would not be interpreted as error codes.
+### Source table
+The source table categorizes HTTP responses into three groups based on status codes:
+- Retry codes (`gid.connector.http.source.lookup.retry-codes`):
+Responses in this group indicate a temporary issue (it can be e.g., HTTP 503 Service Unavailable). When such a response is received, the request should be retried.
+- Success codes (`gid.connector.http.source.lookup.success-codes`):
+These are expected responses that should be processed by table function. The response body can be ignored by specifying its status code additionally in the `gid.connector.http.source.lookup.ignored-response-codes` parameter. For example, an HTTP 404 Not Found response is valid and indicates that the requested item does not exist, so its content can be ignored.
+- Error codes:
+Any response code that is not classified as a retry or success code falls into this category. Receiving such a response will result in a job failure.
+
+
+Above parameters support whitelisting and blacklisting. A sample configuration may look like this:
+`2XX,404,!203` - meaning all codes from group 2XX (200-299), with 404 and without 203 ('!' character). Group blacklisting e.g. !2XX is not supported.
+Notice that ignored-response-codes has to be a subset of success-codes.
+
+The same format is used in parameter `gid.connector.http.source.lookup.retry-codes`.
+
+Example with explanation:
+```roomsql
+CREATE TABLE [...]
+WITH (
+ [...],
+ 'gid.connector.http.source.lookup.success-codes' = '2XX,404',
+ 'gid.connector.http.source.lookup.retry-codes' = '5XX,!501,!505,!506',
+ 'gid.connector.http.source.lookup.ignored-response-codes' = '404'
+)
+```
+All 200s codes and 404 are considered as successful (`success-codes`). These responses won't cause retry or job failure. 404 response is also listed in `ignored-response-codes` parameter, what means content body will be ignored. Http with 404 code will produce just empty record. Notice that 404 has to be specified in both `success-codes` and `ignored-response-codes`.
+When server returns response with 500s code except 501, 505 and 506 then connector will re-send request based on configuration in `gid.connector.http.source.lookup.retry-strategy` parameters. By default it's fixed-delay with 1 second delay, up to 3 times per request (parameter `lookup.max-retries`). After exceeding max-retries limit the job will fail.
+A response with any other code than specified in params `success-codes` and `retry-codes` e.g. 400, 505, 301 will cause job failure.
+
+
+```roomsql
+CREATE TABLE [...]
+WITH (
+ [...],
+ 'gid.connector.http.source.lookup.success-codes' = '1XX,2XX,3XX,4XX,5XX',
+ 'gid.connector.http.source.lookup.retry-codes' = '',
+ 'gid.connector.http.source.lookup.ignored-response-codes' = '1XX,3XX,4XX,5XX'
+)
+```
+For this configuration, all responses are treated as successful, meaning no retries will be triggered based on HTTP response codes. However, only responses with status code 200 will be parsed and processed by the Flink operator. Responses with status codes in the 100s, 300s, 400s, and 500s ranges are categorized as `ignored-response-codes`.
+Note that retries remain enabled and will still occur on IOException.
+To disable retries, set `'lookup.max-retries' = '0'`.
+
+
+
## TLS (more secure replacement for SSL) and mTLS support
Both Http Sink and Lookup Source connectors support HTTPS communication using TLS 1.2 and mTLS.
@@ -452,33 +512,42 @@ be requested if the current time is later than the cached token expiry time minu
## Table API Connector Options
### HTTP TableLookup Source
-| Option | Required | Description/Value |
-|---------------------------------------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| connector | required | The Value should be set to _rest-lookup_ |
-| format | required | Flink's format name that should be used to decode REST response, Use `json` for a typical REST endpoint. |
-| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ |
-| asyncPolling | optional | true/false - determines whether Async Polling should be used. Mechanism is based on Flink's Async I/O. |
-| lookup-method | optional | GET/POST/PUT (and any other) - determines what REST method should be used for lookup REST query. If not specified, `GET` method will be used. |
-| lookup.cache | optional | Enum possible values: `NONE`, `PARTIAL`. The cache strategy for the lookup table. Currently supports `NONE` (no caching) and `PARTIAL` (caching entries on lookup operation in external API). |
-| lookup.partial-cache.max-rows | optional | The max number of rows of lookup cache, over this value, the oldest rows will be expired. `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. |
-| lookup.partial-cache.expire-after-write | optional | The max time to live for each rows in lookup cache after writing into the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. |
-| lookup.partial-cache.expire-after-access | optional | The max time to live for each rows in lookup cache after accessing the entry in the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. |
-| lookup.partial-cache.cache-missing-key | optional | This is a boolean that defaults to true. Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. |
-| lookup.max-retries | optional | The max retry times if the lookup failed; default is 3. See the following Lookup Cache section for more details. |
-| gid.connector.http.lookup.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. |
-| gid.connector.http.lookup.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. |
-| gid.connector.http.security.cert.server | optional | Comma separated paths to trusted HTTP server certificates that should be added to the connectors trust store. |
-| gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. |
-| gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. |
-| gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. |
-| gid.connector.http.security.oidc.token.request | optional | OIDC `Token Request` body in `application/x-www-form-urlencoded` encoding |
-| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued |
-| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. |
-| gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. |
-| gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. |
-| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. |
-| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. |
-| gid.connector.http.source.lookup.request-callback | optional | Specify which `HttpLookupPostRequestCallback` implementation to use. By default, it is set to `slf4j-lookup-logger` corresponding to `Slf4jHttpLookupPostRequestCallback`. |
+| Option | Required | Description/Value |
+|--------------------------------------------------------------------------------------|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| connector | required | The Value should be set to _rest-lookup_ |
+| format | required | Flink's format name that should be used to decode REST response, Use `json` for a typical REST endpoint. |
+| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ |
+| asyncPolling | optional | true/false - determines whether Async Polling should be used. Mechanism is based on Flink's Async I/O. |
+| lookup-method | optional | GET/POST/PUT (and any other) - determines what REST method should be used for lookup REST query. If not specified, `GET` method will be used. |
+| lookup.cache | optional | Enum possible values: `NONE`, `PARTIAL`. The cache strategy for the lookup table. Currently supports `NONE` (no caching) and `PARTIAL` (caching entries on lookup operation in external API). |
+| lookup.partial-cache.max-rows | optional | The max number of rows of lookup cache, over this value, the oldest rows will be expired. `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. |
+| lookup.partial-cache.expire-after-write | optional | The max time to live for each rows in lookup cache after writing into the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. |
+| lookup.partial-cache.expire-after-access | optional | The max time to live for each rows in lookup cache after accessing the entry in the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. |
+| lookup.partial-cache.cache-missing-key | optional | This is a boolean that defaults to true. Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. |
+| lookup.max-retries | optional | The max retry times if the lookup failed; default is 3. See the following Lookup Cache section for more detail. Set value 0 to disable retries. |
+| gid.connector.http.lookup.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. |
+| gid.connector.http.lookup.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. |
+| gid.connector.http.security.cert.server | optional | Comma separated paths to trusted HTTP server certificates that should be added to the connectors trust store. |
+| gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. |
+| gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. |
+| gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. |
+| gid.connector.http.security.oidc.token.request | optional | OIDC `Token Request` body in `application/x-www-form-urlencoded` encoding |
+| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued |
+| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. |
+| gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. |
+| gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. |
+| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. |
+| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. |
+| gid.connector.http.source.lookup.request-callback | optional | Specify which `HttpLookupPostRequestCallback` implementation to use. By default, it is set to `slf4j-lookup-logger` corresponding to `Slf4jHttpLookupPostRequestCallback`. |
+| gid.connector.http.source.lookup.connection.timeout | optional | Source table connection timeout. Default - no value. |
+| gid.connector.http.source.lookup.success-codes | optional | Comma separated http codes considered as success response. Use [1-5]XX for groups and '!' character for excluding. |
+| gid.connector.http.source.lookup.retry-codes | optional | Comma separated http codes considered as transient errors. Use [1-5]XX for groups and '!' character for excluding. |
+| gid.connector.http.source.lookup.ignored-response-codes | optional | Comma separated http codes. Content for these responses will be ignored. Use [1-5]XX for groups and '!' character for excluding. Ignored response codes has to be a subset of `gid.connector.http.source.lookup.success-codes`. |
+| gid.connector.http.source.lookup.retry-strategy.type | optional | Auto retry strategy type: fixed-delay (default) or exponential-delay. |
+| gid.connector.http.source.lookup.retry-strategy.fixed-delay.delay | optional | Fixed-delay interval between retries. Default 1 second. Use with`lookup.max-retries` parameter. |
+| gid.connector.http.source.lookup.retry-strategy.exponential-delay.initial-backoff | optional | Exponential-delay initial delay. Default 1 second. |
+| gid.connector.http.source.lookup.retry-strategy.exponential-delay.max-backoff | optional | Exponential-delay maximum delay. Default 1 minute. Use with `lookup.max-retries` parameter. |
+| gid.connector.http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier | optional | Exponential-delay multiplier. Default value 1.5 |
### HTTP Sink
@@ -604,11 +673,13 @@ The mapping from Http Json Response to SQL table schema is done via Flink's Json
- Changed API for public HttpSink builder. The `setHttpPostRequestCallback` expects a `PostRequestCallback`
of generic type [HttpRequest](src/main/java/com/getindata/connectors/http/internal/sink/httpclient/HttpRequest.java)
instead `HttpSinkRequestEntry`.
+- Version 0.20
+ - Http source table parameters: `gid.connector.http.source.lookup.error.code` and `gid.connector.http.source.lookup.error.code.exclude` were removed. These parameters described http status codes which was silently ignored by source lookup table (logged only). it's not recommended to ignore all error response but it's still possible. To do this set all codes as success: `'gid.connector.http.source.lookup.success-codes' = '1XX,2XX,3XX,4XX,5XX'` and ignore body from the others responses than 200s: `'gid.connector.http.source.lookup.ignored-response-codes' = '1XX,3XX,4XX,5XX'`. You can still exclude some error codes marking it as transition errors - `gid.connector.http.source.lookup.retry-codes`. Retry-codes have to be excluded from both `success-codes` and `ignored-response-codes`.
+ - Added dependency io.github.resilience4j:resilience4j-retry
## TODO
### HTTP TableLookup Source
-- Think about Retry Policy for Http Request
- Check other `//TODO`'s.
### HTTP Sink
diff --git a/dev/checkstyle.xml b/dev/checkstyle.xml
index 80323b15..ffd226b6 100644
--- a/dev/checkstyle.xml
+++ b/dev/checkstyle.xml
@@ -65,7 +65,7 @@
-
+
diff --git a/pom.xml b/pom.xml
index f33e333c..aa0eeafd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,10 +75,8 @@ under the License.
2.12
${target.java.version}
${target.java.version}
- 2.17.2
1.18.22
2.18.1
- 4.13.2
5.10.1
${junit5.version}
3.21.0
@@ -87,6 +85,8 @@ under the License.
0.8.12
3.1.1
4.6.1
+ 1.7.1
+ 2.0.17
@@ -119,25 +119,17 @@ under the License.
provided
-
-
-
- org.apache.logging.log4j
- log4j-slf4j-impl
- ${log4j.version}
- provided
-
- org.apache.logging.log4j
- log4j-api
- ${log4j.version}
- provided
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
- org.apache.logging.log4j
- log4j-core
- ${log4j.version}
- provided
+ org.slf4j
+ slf4j-simple
+ ${slf4j.version}
+ test
@@ -167,6 +159,12 @@ under the License.
provided
+
+ io.github.resilience4j
+ resilience4j-retry
+ ${resilence4j.version}
+
+
org.apache.httpcomponents
diff --git a/src/main/java/com/getindata/connectors/http/HttpStatusCodeValidationFailedException.java b/src/main/java/com/getindata/connectors/http/HttpStatusCodeValidationFailedException.java
new file mode 100644
index 00000000..cad25b29
--- /dev/null
+++ b/src/main/java/com/getindata/connectors/http/HttpStatusCodeValidationFailedException.java
@@ -0,0 +1,15 @@
+package com.getindata.connectors.http;
+
+import java.net.http.HttpResponse;
+
+import lombok.Getter;
+
+@Getter
+public class HttpStatusCodeValidationFailedException extends Exception {
+ private final HttpResponse> response;
+
+ public HttpStatusCodeValidationFailedException(String message, HttpResponse> response) {
+ super(message);
+ this.response = response;
+ }
+}
diff --git a/src/main/java/com/getindata/connectors/http/internal/PollingClient.java b/src/main/java/com/getindata/connectors/http/internal/PollingClient.java
index a035e8a8..925e10b6 100644
--- a/src/main/java/com/getindata/connectors/http/internal/PollingClient.java
+++ b/src/main/java/com/getindata/connectors/http/internal/PollingClient.java
@@ -3,6 +3,7 @@
import java.util.Collection;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
/**
* A client that is used to get enrichment data from external component.
@@ -15,4 +16,10 @@ public interface PollingClient {
* @return an optional result of data lookup.
*/
Collection pull(RowData lookupRow);
+
+ /**
+ * Initialize the client.
+ * @param ctx function context
+ */
+ void open(FunctionContext ctx);
}
diff --git a/src/main/java/com/getindata/connectors/http/internal/PollingClientFactory.java b/src/main/java/com/getindata/connectors/http/internal/PollingClientFactory.java
index cd6334aa..7981558d 100644
--- a/src/main/java/com/getindata/connectors/http/internal/PollingClientFactory.java
+++ b/src/main/java/com/getindata/connectors/http/internal/PollingClientFactory.java
@@ -3,6 +3,7 @@
import java.io.Serializable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.util.ConfigurationException;
import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;
@@ -11,5 +12,5 @@ public interface PollingClientFactory extends Serializable {
PollingClient createPollClient(
HttpLookupConfig options,
DeserializationSchema schemaDecoder
- );
+ ) throws ConfigurationException;
}
diff --git a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java
index 61c413c8..8bb5c277 100644
--- a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java
+++ b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java
@@ -18,14 +18,14 @@ public final class HttpConnectorConfigConstants {
* A property prefix for http connector.
*/
public static final String GID_CONNECTOR_HTTP = "gid.connector.http.";
+ private static final String SOURCE_LOOKUP_PREFIX = GID_CONNECTOR_HTTP + "source.lookup.";
/**
* A property prefix for http connector header properties
*/
public static final String SINK_HEADER_PREFIX = GID_CONNECTOR_HTTP + "sink.header.";
- public static final String LOOKUP_SOURCE_HEADER_PREFIX = GID_CONNECTOR_HTTP
- + "source.lookup.header.";
+ public static final String LOOKUP_SOURCE_HEADER_PREFIX = SOURCE_LOOKUP_PREFIX + "header.";
public static final String OIDC_AUTH_TOKEN_REQUEST = GID_CONNECTOR_HTTP
+ "security.oidc.token.request";
@@ -40,33 +40,24 @@ public final class HttpConnectorConfigConstants {
* the special treatment of the header for Basic Authentication, thus preserving the passed
* raw value. Defaults to false.
*/
- public static final String LOOKUP_SOURCE_HEADER_USE_RAW = GID_CONNECTOR_HTTP
- + "source.lookup.use-raw-authorization-header";
+ public static final String LOOKUP_SOURCE_HEADER_USE_RAW = SOURCE_LOOKUP_PREFIX + "use-raw-authorization-header";
- public static final String RESULT_TYPE = GID_CONNECTOR_HTTP
- + "source.lookup.result-type";
+ public static final String RESULT_TYPE = SOURCE_LOOKUP_PREFIX + "result-type";
// --------- Error code handling configuration ---------
- public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST =
- GID_CONNECTOR_HTTP + "sink.error.code.exclude";
+ public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST = GID_CONNECTOR_HTTP + "sink.error.code.exclude";
public static final String HTTP_ERROR_SINK_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code";
-
- public static final String HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST =
- GID_CONNECTOR_HTTP + "source.lookup.error.code.exclude";
-
- public static final String HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST =
- GID_CONNECTOR_HTTP + "source.lookup.error.code";
// -----------------------------------------------------
public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER =
- GID_CONNECTOR_HTTP + "source.lookup.request-callback";
+ SOURCE_LOOKUP_PREFIX + "request-callback";
public static final String SINK_REQUEST_CALLBACK_IDENTIFIER =
GID_CONNECTOR_HTTP + "sink.request-callback";
public static final String SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER =
- GID_CONNECTOR_HTTP + "source.lookup.query-creator";
+ SOURCE_LOOKUP_PREFIX + "query-creator";
// -------------- HTTPS security settings --------------
public static final String ALLOW_SELF_SIGNED =
@@ -92,16 +83,19 @@ public final class HttpConnectorConfigConstants {
// ------ HTTPS timeouts and thread pool settings ------
public static final String LOOKUP_HTTP_TIMEOUT_SECONDS =
- GID_CONNECTOR_HTTP + "source.lookup.request.timeout";
+ SOURCE_LOOKUP_PREFIX + "request.timeout";
+
+ public static final String SOURCE_CONNECTION_TIMEOUT =
+ SOURCE_LOOKUP_PREFIX + "connection.timeout";
public static final String SINK_HTTP_TIMEOUT_SECONDS =
GID_CONNECTOR_HTTP + "sink.request.timeout";
public static final String LOOKUP_HTTP_PULING_THREAD_POOL_SIZE =
- GID_CONNECTOR_HTTP + "source.lookup.request.thread-pool.size";
+ SOURCE_LOOKUP_PREFIX + "request.thread-pool.size";
public static final String LOOKUP_HTTP_RESPONSE_THREAD_POOL_SIZE =
- GID_CONNECTOR_HTTP + "source.lookup.response.thread-pool.size";
+ SOURCE_LOOKUP_PREFIX + "response.thread-pool.size";
public static final String SINK_HTTP_WRITER_THREAD_POOL_SIZE =
GID_CONNECTOR_HTTP + "sink.writer.thread-pool.size";
@@ -117,4 +111,21 @@ public final class HttpConnectorConfigConstants {
GID_CONNECTOR_HTTP + "sink.request.batch.size";
// ---------------------------------------------
+ public static final String SOURCE_RETRY_SUCCESS_CODES = SOURCE_LOOKUP_PREFIX + "success-codes";
+ public static final String SOURCE_RETRY_RETRY_CODES = SOURCE_LOOKUP_PREFIX + "retry-codes";
+ public static final String SOURCE_IGNORE_RESPONSE_CODES = SOURCE_LOOKUP_PREFIX + "ignored-response-codes";
+
+ public static final String SOURCE_RETRY_STRATEGY_PREFIX = SOURCE_LOOKUP_PREFIX + "retry-strategy.";
+ public static final String SOURCE_RETRY_STRATEGY_TYPE = SOURCE_RETRY_STRATEGY_PREFIX + "type";
+
+ private static final String SOURCE_RETRY_FIXED_DELAY_PREFIX = SOURCE_RETRY_STRATEGY_PREFIX + "fixed-delay.";
+ public static final String SOURCE_RETRY_FIXED_DELAY_DELAY = SOURCE_RETRY_FIXED_DELAY_PREFIX + "delay";
+
+ private static final String SOURCE_RETRY_EXP_DELAY_PREFIX = SOURCE_RETRY_STRATEGY_PREFIX + "exponential-delay.";
+ public static final String SOURCE_RETRY_EXP_DELAY_INITIAL_BACKOFF =
+ SOURCE_RETRY_EXP_DELAY_PREFIX + "initial-backoff";
+ public static final String SOURCE_RETRY_EXP_DELAY_MAX_BACKOFF =
+ SOURCE_RETRY_EXP_DELAY_PREFIX + "max-backoff";
+ public static final String SOURCE_RETRY_EXP_DELAY_MULTIPLIER =
+ SOURCE_RETRY_EXP_DELAY_PREFIX + "backoff-multiplier";
}
diff --git a/src/main/java/com/getindata/connectors/http/internal/retry/HttpClientWithRetry.java b/src/main/java/com/getindata/connectors/http/internal/retry/HttpClientWithRetry.java
new file mode 100644
index 00000000..bdb424aa
--- /dev/null
+++ b/src/main/java/com/getindata/connectors/http/internal/retry/HttpClientWithRetry.java
@@ -0,0 +1,71 @@
+package com.getindata.connectors.http.internal.retry;
+
+import java.io.IOException;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.function.Supplier;
+
+import io.github.resilience4j.retry.Retry;
+import io.github.resilience4j.retry.RetryConfig;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.metrics.MetricGroup;
+
+import com.getindata.connectors.http.HttpStatusCodeValidationFailedException;
+import com.getindata.connectors.http.internal.status.HttpResponseChecker;
+
+@Slf4j
+public class HttpClientWithRetry {
+
+ private final HttpClient httpClient;
+ @Getter
+ private final HttpResponseChecker responseChecker;
+ private final Retry retry;
+
+ @Builder
+ HttpClientWithRetry(HttpClient httpClient,
+ RetryConfig retryConfig,
+ HttpResponseChecker responseChecker) {
+ this.httpClient = httpClient;
+ this.responseChecker = responseChecker;
+ var adjustedRetryConfig = RetryConfig.from(retryConfig)
+ .retryExceptions(IOException.class)
+ .retryOnResult(this::isTemporalError)
+ .build();
+ this.retry = Retry.of("http-lookup-connector", adjustedRetryConfig);
+ }
+
+ public void registerMetrics(MetricGroup metrics){
+ var group = metrics.addGroup("http_lookup_connector");
+ group.gauge("successfulCallsWithRetryAttempt",
+ () -> retry.getMetrics().getNumberOfSuccessfulCallsWithRetryAttempt());
+ group.gauge("successfulCallsWithoutRetryAttempt",
+ () -> retry.getMetrics().getNumberOfSuccessfulCallsWithoutRetryAttempt());
+ }
+
+ public HttpResponse send(
+ Supplier requestSupplier,
+ HttpResponse.BodyHandler responseBodyHandler
+ ) throws IOException, InterruptedException, HttpStatusCodeValidationFailedException {
+ try {
+ var response = Retry.decorateCheckedSupplier(retry,
+ () -> httpClient.send(requestSupplier.get(), responseBodyHandler)).apply();
+ if (!responseChecker.isSuccessful(response)) {
+ throw new HttpStatusCodeValidationFailedException(
+ "Incorrect response code: " + response.statusCode(), response);
+ }
+ return response;
+ } catch (IOException | InterruptedException | HttpStatusCodeValidationFailedException e) {
+ throw e; //re-throw without wrapping
+ } catch (Throwable t) {
+ throw new RuntimeException("Unexpected exception", t);
+ }
+ }
+
+ private boolean isTemporalError(Object response) {
+ return responseChecker.isTemporalError((HttpResponse>) response);
+ }
+}
+
diff --git a/src/main/java/com/getindata/connectors/http/internal/retry/RetryConfigProvider.java b/src/main/java/com/getindata/connectors/http/internal/retry/RetryConfigProvider.java
new file mode 100644
index 00000000..0a09d7d9
--- /dev/null
+++ b/src/main/java/com/getindata/connectors/http/internal/retry/RetryConfigProvider.java
@@ -0,0 +1,58 @@
+package com.getindata.connectors.http.internal.retry;
+
+import io.github.resilience4j.core.IntervalFunction;
+import io.github.resilience4j.retry.RetryConfig;
+import lombok.AccessLevel;
+import lombok.RequiredArgsConstructor;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
+import static io.github.resilience4j.core.IntervalFunction.ofExponentialBackoff;
+
+import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_INITIAL_BACKOFF;
+import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MAX_BACKOFF;
+import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MULTIPLIER;
+import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_FIXED_DELAY_DELAY;
+import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_STRATEGY;
+
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+public class RetryConfigProvider {
+
+ private final ReadableConfig config;
+
+ public static RetryConfig create(ReadableConfig config) {
+ return new RetryConfigProvider(config).create();
+ }
+
+ private RetryConfig create() {
+ return createBuilder()
+ .maxAttempts(config.get(LookupOptions.MAX_RETRIES) + 1)
+ .build();
+ }
+
+ private RetryConfig.Builder> createBuilder() {
+ var retryStrategy = getRetryStrategy();
+ if (retryStrategy == RetryStrategyType.FIXED_DELAY) {
+ return configureFixedDelay();
+ } else if (retryStrategy == RetryStrategyType.EXPONENTIAL_DELAY) {
+ return configureExponentialDelay();
+ }
+ throw new IllegalArgumentException("Unsupported retry strategy: " + retryStrategy);
+ }
+
+ private RetryStrategyType getRetryStrategy() {
+ return RetryStrategyType.fromCode(config.get(SOURCE_LOOKUP_RETRY_STRATEGY));
+ }
+
+ private RetryConfig.Builder> configureFixedDelay() {
+ return RetryConfig.custom()
+ .intervalFunction(IntervalFunction.of(config.get(SOURCE_LOOKUP_RETRY_FIXED_DELAY_DELAY)));
+ }
+
+ private RetryConfig.Builder> configureExponentialDelay() {
+ var initialDelay = config.get(SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_INITIAL_BACKOFF);
+ var maxDelay = config.get(SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MAX_BACKOFF);
+ var multiplier = config.get(SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MULTIPLIER);
+ return RetryConfig.custom()
+ .intervalFunction(ofExponentialBackoff(initialDelay, multiplier, maxDelay));
+ }
+}
diff --git a/src/main/java/com/getindata/connectors/http/internal/retry/RetryStrategyType.java b/src/main/java/com/getindata/connectors/http/internal/retry/RetryStrategyType.java
new file mode 100644
index 00000000..b9c8876d
--- /dev/null
+++ b/src/main/java/com/getindata/connectors/http/internal/retry/RetryStrategyType.java
@@ -0,0 +1,27 @@
+package com.getindata.connectors.http.internal.retry;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+@Getter
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+public enum RetryStrategyType {
+ FIXED_DELAY("fixed-delay"),
+ EXPONENTIAL_DELAY("exponential-delay"),
+ ;
+
+ private final String code;
+
+ public static RetryStrategyType fromCode(String code) {
+ if (code == null) {
+ throw new NullPointerException("Code is null");
+ }
+ for (var strategy : RetryStrategyType.values()) {
+ if (strategy.getCode().equalsIgnoreCase(code)) {
+ return strategy;
+ }
+ }
+ throw new IllegalArgumentException("No enum constant for " + code);
+ }
+}
diff --git a/src/main/java/com/getindata/connectors/http/internal/status/HttpCodesParser.java b/src/main/java/com/getindata/connectors/http/internal/status/HttpCodesParser.java
new file mode 100644
index 00000000..1f7a52cd
--- /dev/null
+++ b/src/main/java/com/getindata/connectors/http/internal/status/HttpCodesParser.java
@@ -0,0 +1,63 @@
+package com.getindata.connectors.http.internal.status;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import static java.lang.String.format;
+
+import lombok.experimental.UtilityClass;
+import org.apache.flink.util.ConfigurationException;
+
+@UtilityClass
+public class HttpCodesParser {
+
+ private final Pattern CODE_GROUP_EXPRESSION = Pattern.compile("[1-5][xX]{2}");
+ private final String DELIMITER = Pattern.quote(",");
+ private final int HTTP_CODE_MIN = 100;
+ private final int HTTP_CODE_MAX = 599;
+
+ public Set parse(String codesExpression) throws ConfigurationException {
+ var whitelist = new HashSet();
+ var blacklist = new HashSet();
+ for (var rawCode : codesExpression.split(DELIMITER)) {
+ var code = rawCode.trim();
+ if (code.isEmpty()) {
+ continue;
+ }
+ if (code.startsWith("!")) {
+ try {
+ blacklist.add(parseHttpCode(code.substring(1)));
+ continue;
+ } catch (NumberFormatException e) {
+ throw new ConfigurationException("Can not parse code " + code);
+ }
+ }
+ try {
+ whitelist.add(parseHttpCode(code));
+ } catch (NumberFormatException e) {
+ if (CODE_GROUP_EXPRESSION.matcher(code).matches()) {
+ var firstGroupCode = Integer.parseInt(code.substring(0, 1)) * 100;
+ var groupCodes = IntStream.range(firstGroupCode, firstGroupCode + 100)
+ .boxed().collect(Collectors.toList());
+ whitelist.addAll(groupCodes);
+ } else {
+ throw new ConfigurationException("Can not parse code " + code);
+ }
+ }
+ }
+
+ whitelist.removeAll(blacklist);
+ return Collections.unmodifiableSet(whitelist);
+ }
+
+ private Integer parseHttpCode(String str) throws ConfigurationException {
+ var parsed = Integer.parseInt(str);
+ if (parsed < HTTP_CODE_MIN || parsed > HTTP_CODE_MAX) {
+ throw new ConfigurationException(format("Http code out of the range [%s]", parsed));
+ }
+ return parsed;
+ }
+}
diff --git a/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseChecker.java
new file mode 100644
index 00000000..0aaf9007
--- /dev/null
+++ b/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseChecker.java
@@ -0,0 +1,56 @@
+package com.getindata.connectors.http.internal.status;
+
+import java.net.http.HttpResponse;
+import java.util.HashSet;
+import java.util.Set;
+
+import lombok.Getter;
+import lombok.NonNull;
+import org.apache.flink.util.ConfigurationException;
+
+@Getter
+public class HttpResponseChecker {
+
+ private final Set successCodes;
+ private final Set temporalErrorCodes;
+
+ public HttpResponseChecker(@NonNull String successCodeExpr, @NonNull String temporalErrorCodeExpr)
+ throws ConfigurationException {
+ this(HttpCodesParser.parse(successCodeExpr), HttpCodesParser.parse(temporalErrorCodeExpr));
+ }
+
+ public HttpResponseChecker(@NonNull Set successCodes, @NonNull Set temporalErrorCodes)
+ throws ConfigurationException {
+ this.successCodes = successCodes;
+ this.temporalErrorCodes = temporalErrorCodes;
+ validate();
+ }
+
+ public boolean isSuccessful(HttpResponse> response) {
+ return isSuccessful(response.statusCode());
+ }
+
+ public boolean isSuccessful(int httpStatusCode) {
+ return successCodes.contains(httpStatusCode);
+ }
+
+ public boolean isTemporalError(HttpResponse> response) {
+ return isTemporalError(response.statusCode());
+ }
+
+ public boolean isTemporalError(int httpStatusCode) {
+ return temporalErrorCodes.contains(httpStatusCode);
+ }
+
+ private void validate() throws ConfigurationException {
+ if (successCodes.isEmpty()) {
+ throw new ConfigurationException("Success code list can not be empty");
+ }
+ var intersection = new HashSet<>(successCodes);
+ intersection.retainAll(temporalErrorCodes);
+ if (!intersection.isEmpty()) {
+ throw new ConfigurationException("Http codes " + intersection +
+ " can not be used as both success and retry codes");
+ }
+ }
+}
diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java
index 9947d52d..b2d338fa 100644
--- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java
+++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java
@@ -5,6 +5,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import com.getindata.connectors.http.internal.retry.RetryStrategyType;
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.*;
public class HttpLookupConnectorOptions {
@@ -74,4 +75,62 @@ public class HttpLookupConnectorOptions {
" reduction as a Duration." +
" A new access token is obtained if the token" +
" is older than it's expiry time minus this value.");
+
+ public static final ConfigOption SOURCE_LOOKUP_CONNECTION_TIMEOUT =
+ ConfigOptions.key(SOURCE_CONNECTION_TIMEOUT)
+ .durationType()
+ .noDefaultValue()
+ .withDescription("Http client connection timeout.");
+
+ public static final ConfigOption SOURCE_LOOKUP_RETRY_STRATEGY =
+ ConfigOptions.key(SOURCE_RETRY_STRATEGY_TYPE)
+ .stringType()
+ .defaultValue(RetryStrategyType.FIXED_DELAY.getCode())
+ .withDescription("Auto retry strategy type: fixed-delay (default) or exponential-delay.");
+
+ public static final ConfigOption SOURCE_LOOKUP_HTTP_SUCCESS_CODES =
+ ConfigOptions.key(SOURCE_RETRY_SUCCESS_CODES)
+ .stringType()
+ .defaultValue("2XX")
+ .withDescription("Comma separated http codes considered as success response. " +
+ "Use [1-5]XX for groups and '!' character for excluding.");
+
+ public static final ConfigOption SOURCE_LOOKUP_HTTP_RETRY_CODES =
+ ConfigOptions.key(SOURCE_RETRY_RETRY_CODES)
+ .stringType()
+ .defaultValue("500,503,504")
+ .withDescription("Comma separated http codes considered as transient errors. " +
+ "Use [1-5]XX for groups and '!' character for excluding.");
+
+ public static final ConfigOption SOURCE_LOOKUP_RETRY_FIXED_DELAY_DELAY =
+ ConfigOptions.key(SOURCE_RETRY_FIXED_DELAY_DELAY)
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1))
+ .withDescription("Fixed-delay interval between retries.");
+
+ public static final ConfigOption SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_INITIAL_BACKOFF =
+ ConfigOptions.key(SOURCE_RETRY_EXP_DELAY_INITIAL_BACKOFF)
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1))
+ .withDescription("Exponential-delay initial delay.");
+
+ public static final ConfigOption SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MAX_BACKOFF =
+ ConfigOptions.key(SOURCE_RETRY_EXP_DELAY_MAX_BACKOFF)
+ .durationType()
+ .defaultValue(Duration.ofMinutes(1))
+ .withDescription("Exponential-delay maximum delay.");
+
+ public static final ConfigOption SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MULTIPLIER =
+ ConfigOptions.key(SOURCE_RETRY_EXP_DELAY_MULTIPLIER)
+ .doubleType()
+ .defaultValue(1.5)
+ .withDescription("Exponential-delay multiplier.");
+
+ public static final ConfigOption SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES =
+ ConfigOptions.key(SOURCE_IGNORE_RESPONSE_CODES)
+ .stringType()
+ .defaultValue("")
+ .withDescription("Comma separated http codes. Content for these responses will be ignored. " +
+ "Use [1-5]XX for groups and '!' character for excluding. " +
+ "Ignored response codes has to be a subset of " + SOURCE_RETRY_SUCCESS_CODES);
}
diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java
index 6c2edf20..0939573a 100644
--- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java
+++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java
@@ -82,6 +82,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
getLookupCache(readable)
);
}
+
protected void validateHttpLookupSourceOptions(ReadableConfig tableOptions)
throws IllegalArgumentException {
// ensure that there is an OIDC token request if we have an OIDC token endpoint
@@ -107,19 +108,33 @@ public Set> requiredOptions() {
@Override
public Set> optionalOptions() {
return Set.of(
- URL_ARGS,
- ASYNC_POLLING,
- LOOKUP_METHOD,
- REQUEST_CALLBACK_IDENTIFIER,
- LookupOptions.CACHE_TYPE,
- LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
- LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
- LookupOptions.PARTIAL_CACHE_MAX_ROWS,
- LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY,
- LookupOptions.MAX_RETRIES,
- SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION,
- SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST,
- SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL
+ URL_ARGS,
+ ASYNC_POLLING,
+ LOOKUP_METHOD,
+ REQUEST_CALLBACK_IDENTIFIER,
+
+ LookupOptions.CACHE_TYPE,
+ LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
+ LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
+ LookupOptions.PARTIAL_CACHE_MAX_ROWS,
+ LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY,
+
+ SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION,
+ SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST,
+ SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL,
+
+ LookupOptions.MAX_RETRIES,
+ SOURCE_LOOKUP_RETRY_STRATEGY,
+ SOURCE_LOOKUP_RETRY_FIXED_DELAY_DELAY,
+ SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_INITIAL_BACKOFF,
+ SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MULTIPLIER,
+ SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MAX_BACKOFF,
+
+ SOURCE_LOOKUP_HTTP_SUCCESS_CODES,
+ SOURCE_LOOKUP_HTTP_RETRY_CODES,
+ SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES,
+
+ SOURCE_LOOKUP_CONNECTION_TIMEOUT // TODO: add request timeout from properties
);
}
diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java
index 8bab7451..9c87ff47 100644
--- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java
+++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java
@@ -59,6 +59,8 @@ public void open(FunctionContext context) throws Exception {
context.getMetricGroup()
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
+
+ client.open(context);
}
@Override
diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java
index 736bc8c0..8e8d7180 100644
--- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java
+++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java
@@ -12,6 +12,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
@@ -22,17 +23,22 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.StringUtils;
import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.PollingClient;
-import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
-import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker;
-import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig;
-import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker;
+import com.getindata.connectors.http.internal.retry.HttpClientWithRetry;
+import com.getindata.connectors.http.internal.retry.RetryConfigProvider;
+import com.getindata.connectors.http.internal.status.HttpCodesParser;
+import com.getindata.connectors.http.internal.status.HttpResponseChecker;
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.RESULT_TYPE;
+import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES;
+import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_RETRY_CODES;
+import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_SUCCESS_CODES;
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST;
/**
@@ -45,67 +51,64 @@ public class JavaNetHttpPollingClient implements PollingClient {
private static final String RESULT_TYPE_SINGLE_VALUE = "single-value";
private static final String RESULT_TYPE_ARRAY = "array";
- private final HttpClient httpClient;
-
- private final HttpStatusCodeChecker statusCodeChecker;
-
+ private final HttpClientWithRetry httpClient;
private final DeserializationSchema responseBodyDecoder;
-
private final HttpRequestFactory requestFactory;
-
private final ObjectMapper objectMapper;
-
private final HttpPostRequestCallback httpPostRequestCallback;
-
private final HttpLookupConfig options;
+ private final Set ignoredErrorCodes;
public JavaNetHttpPollingClient(
HttpClient httpClient,
DeserializationSchema responseBodyDecoder,
HttpLookupConfig options,
- HttpRequestFactory requestFactory) {
+ HttpRequestFactory requestFactory) throws ConfigurationException {
- this.httpClient = httpClient;
this.responseBodyDecoder = responseBodyDecoder;
this.requestFactory = requestFactory;
-
this.objectMapper = new ObjectMapper();
this.httpPostRequestCallback = options.getHttpPostRequestCallback();
+ this.options = options;
- // TODO Inject this via constructor when implementing a response processor.
- // Processor will be injected and it will wrap statusChecker implementation.
- ComposeHttpStatusCodeCheckerConfig checkerConfig =
- ComposeHttpStatusCodeCheckerConfig.builder()
- .properties(options.getProperties())
- .whiteListPrefix(
- HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST
- )
- .errorCodePrefix(HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST)
+ var config = options.getReadableConfig();
+ this.httpClient = HttpClientWithRetry.builder()
+ .httpClient(httpClient)
+ .retryConfig(RetryConfigProvider.create(config))
+ .responseChecker(new HttpResponseChecker(
+ config.get(SOURCE_LOOKUP_HTTP_SUCCESS_CODES),
+ config.get(SOURCE_LOOKUP_HTTP_RETRY_CODES)))
.build();
- this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig);
- this.options = options;
+ this.ignoredErrorCodes = HttpCodesParser.parse(config.get(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES));
+
+ validateIgnoredResponseCodes(this.httpClient.getResponseChecker());
+ }
+
+ public void open(FunctionContext context) {
+ httpClient.registerMetrics(context.getMetricGroup());
}
+
@Override
public Collection pull(RowData lookupRow) {
+ if (lookupRow == null) {
+ return Collections.emptyList();
+ }
try {
log.debug("Collection pull with Rowdata={}.", lookupRow);
return queryAndProcess(lookupRow);
} catch (Exception e) {
- log.error("Exception during HTTP request.", e);
- return Collections.emptyList();
+ throw new RuntimeException("Exception during HTTP request", e);
}
}
- // TODO Add Retry Policy And configure TimeOut from properties
private Collection queryAndProcess(RowData lookupData) throws Exception {
+ var request = requestFactory.buildLookupRequest(lookupData);
- HttpLookupSourceRequestEntry request = requestFactory.buildLookupRequest(lookupData);
- HttpResponse response = httpClient.send(
- updateHttpRequestIfRequired(request,
- HttpHeaderUtils.createOIDCHeaderPreprocessor(options.getReadableConfig())),
- BodyHandlers.ofString());
+ var oidcProcessor = HttpHeaderUtils.createOIDCHeaderPreprocessor(options.getReadableConfig());
+ var response = httpClient.send(
+ () -> updateHttpRequestIfRequired(request, oidcProcessor), BodyHandlers.ofString());
return processHttpResponse(response, request);
}
@@ -162,31 +165,15 @@ private Collection processHttpResponse(
this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap());
- if (response == null) {
- return Collections.emptyList();
- }
-
- String responseBody = response.body();
- int statusCode = response.statusCode();
+ var responseBody = response.body();
- log.debug(String.format("Received status code [%s] for RestTableSource request " +
- "with Server response body [%s] ", statusCode, responseBody));
-
- if (notErrorCodeAndNotEmptyBody(responseBody, statusCode)) {
- return deserialize(responseBody);
- } else {
- log.warn(
- String.format("Returned Http status code was invalid or returned body was empty. "
- + "Status Code [%s]", statusCode)
- );
+ log.debug("Received status code [{}] for RestTableSource request with Server response body [{}] ",
+ response.statusCode(), responseBody);
+ if (StringUtils.isNullOrWhitespaceOnly(responseBody) || ignoreResponse(response)) {
return Collections.emptyList();
}
- }
-
- private boolean notErrorCodeAndNotEmptyBody(String body, int statusCode) {
- return !(StringUtils.isNullOrWhitespaceOnly(body) || statusCodeChecker.isErrorCode(
- statusCode));
+ return deserialize(responseBody);
}
@VisibleForTesting
@@ -231,4 +218,17 @@ private List deserializeArray(byte[] rawBytes) throws IOException {
}
return result;
}
+
+ private boolean ignoreResponse(HttpResponse> response) {
+ return ignoredErrorCodes.contains(response.statusCode());
+ }
+
+ private void validateIgnoredResponseCodes(HttpResponseChecker responseChecker) throws ConfigurationException {
+ for (var code : ignoredErrorCodes) {
+ if (!responseChecker.isSuccessful(code)) {
+ throw new ConfigurationException(
+ "Ignored http status code " + code + " has to be specified as success code in retry mechanism");
+ }
+ }
+ }
}
diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java
index cb3c4dd2..61ffe21f 100644
--- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java
+++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactory.java
@@ -4,6 +4,7 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.ConfigurationException;
import com.getindata.connectors.http.internal.PollingClientFactory;
import com.getindata.connectors.http.internal.utils.JavaNetHttpClientFactory;
@@ -19,9 +20,9 @@ public JavaNetHttpPollingClientFactory(HttpRequestFactory requestFactory) {
@Override
public JavaNetHttpPollingClient createPollClient(
HttpLookupConfig options,
- DeserializationSchema schemaDecoder) {
+ DeserializationSchema schemaDecoder) throws ConfigurationException {
- HttpClient httpClient = JavaNetHttpClientFactory.createClient(options.getProperties());
+ HttpClient httpClient = JavaNetHttpClientFactory.createClient(options);
return new JavaNetHttpPollingClient(
httpClient,
diff --git a/src/main/java/com/getindata/connectors/http/internal/utils/JavaNetHttpClientFactory.java b/src/main/java/com/getindata/connectors/http/internal/utils/JavaNetHttpClientFactory.java
index 466a93c6..99976542 100644
--- a/src/main/java/com/getindata/connectors/http/internal/utils/JavaNetHttpClientFactory.java
+++ b/src/main/java/com/getindata/connectors/http/internal/utils/JavaNetHttpClientFactory.java
@@ -19,6 +19,8 @@
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.security.SecurityContext;
import com.getindata.connectors.http.internal.security.SelfSignedTrustManager;
+import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;
+import com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions;
@Slf4j
@NoArgsConstructor(access = AccessLevel.NONE)
@@ -28,17 +30,22 @@ public class JavaNetHttpClientFactory {
* Creates Java's {@link HttpClient} instance that will be using default, JVM shared {@link
* java.util.concurrent.ForkJoinPool} for async calls.
*
- * @param properties properties used to build {@link SSLContext}
+ * @param options table configuration
* @return new {@link HttpClient} instance.
*/
- public static HttpClient createClient(Properties properties) {
+ public static HttpClient createClient(HttpLookupConfig options) {
- SSLContext sslContext = getSslContext(properties);
+ SSLContext sslContext = getSslContext(options.getProperties());
- return HttpClient.newBuilder()
+ var clientBuilder = HttpClient.newBuilder()
.followRedirects(Redirect.NORMAL)
- .sslContext(sslContext)
- .build();
+ .sslContext(sslContext);
+
+ options.getReadableConfig()
+ .getOptional(HttpLookupConnectorOptions.SOURCE_LOOKUP_CONNECTION_TIMEOUT)
+ .ifPresent(clientBuilder::connectTimeout);
+
+ return clientBuilder.build();
}
/**
diff --git a/src/test/java/com/getindata/connectors/http/internal/retry/HttpClientWithRetryTest.java b/src/test/java/com/getindata/connectors/http/internal/retry/HttpClientWithRetryTest.java
new file mode 100644
index 00000000..fe3a7254
--- /dev/null
+++ b/src/test/java/com/getindata/connectors/http/internal/retry/HttpClientWithRetryTest.java
@@ -0,0 +1,143 @@
+package com.getindata.connectors.http.internal.retry;
+
+import java.io.IOException;
+import java.net.http.HttpClient;
+import java.net.http.HttpResponse;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+import io.github.resilience4j.core.IntervalFunction;
+import io.github.resilience4j.retry.RetryConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.getindata.connectors.http.HttpStatusCodeValidationFailedException;
+import com.getindata.connectors.http.internal.status.HttpResponseChecker;
+
+
+@SuppressWarnings("unchecked")
+@ExtendWith(MockitoExtension.class)
+class HttpClientWithRetryTest {
+
+ @Mock
+ private HttpClient httpClient;
+
+ @Mock
+ private HttpResponseChecker responseChecker;
+
+ private HttpClientWithRetry client;
+
+ @BeforeEach
+ void setup() {
+ var retryConfig = RetryConfig.custom()
+ .maxAttempts(3)
+ .intervalFunction(IntervalFunction.of(1))
+ .build();
+ client = new HttpClientWithRetry(httpClient, retryConfig, responseChecker);
+ }
+
+ @Test
+ void shouldRetryOnIOException() throws IOException, InterruptedException, HttpStatusCodeValidationFailedException {
+ var response = mock(HttpResponse.class);
+ when(httpClient.send(any(), any())).thenThrow(IOException.class).thenReturn(response);
+ when(responseChecker.isSuccessful(response)).thenReturn(true);
+
+ var result = client.send(mock(Supplier.class), mock(HttpResponse.BodyHandler.class));
+
+ verify(httpClient, times(2)).send(any(), any());
+ assertEquals(response, result);
+ }
+
+ @Test
+ void shouldRetryOnTemporalException()
+ throws IOException, InterruptedException, HttpStatusCodeValidationFailedException {
+ var responseA = mock(HttpResponse.class);
+ var responseB = mock(HttpResponse.class);
+ when(httpClient.send(any(), any())).thenReturn(responseA, responseA, responseB);
+ when(responseChecker.isTemporalError(responseA)).thenReturn(true);
+ when(responseChecker.isTemporalError(responseB)).thenReturn(false);
+ when(responseChecker.isSuccessful(responseB)).thenReturn(true);
+
+ var result = client.send(mock(Supplier.class), mock(HttpResponse.BodyHandler.class));
+
+ verify(httpClient, times(3)).send(any(), any());
+ assertEquals(responseB, result);
+ }
+
+ @Test
+ void shouldFailAfterExceedingMaxRetryAttempts() throws IOException, InterruptedException {
+ var response = mock(HttpResponse.class);
+ when(httpClient.send(any(), any())).thenReturn(response);
+ when(responseChecker.isSuccessful(response)).thenReturn(false);
+ when(responseChecker.isTemporalError(response)).thenReturn(true);
+
+ var exception = assertThrows(HttpStatusCodeValidationFailedException.class,
+ () -> client.send(mock(Supplier.class), mock(HttpResponse.BodyHandler.class)));
+
+ verify(httpClient, times(3)).send(any(), any());
+ assertEquals(response, exception.getResponse());
+ }
+
+ @Test
+ void shouldFailOnError() throws IOException, InterruptedException {
+ var response = mock(HttpResponse.class);
+ when(httpClient.send(any(), any())).thenReturn(response);
+ when(responseChecker.isSuccessful(response)).thenReturn(false);
+ when(responseChecker.isTemporalError(response)).thenReturn(false);
+
+ assertThrows(HttpStatusCodeValidationFailedException.class,
+ () -> client.send(mock(Supplier.class), mock(HttpResponse.BodyHandler.class)));
+
+ verify(httpClient, times(1)).send(any(), any());
+ }
+
+ @Test
+ void shouldHandleUncheckedExceptionFromRetry() throws IOException, InterruptedException {
+ when(httpClient.send(any(), any())).thenThrow(RuntimeException.class);
+
+ assertThrows(RuntimeException.class,
+ () -> client.send(mock(Supplier.class), mock(HttpResponse.BodyHandler.class)));
+
+ verify(httpClient, times(1)).send(any(), any());
+ }
+
+ @Test
+ void shouldSendRequestAndProcessSuccessfulResponse()
+ throws IOException, InterruptedException, HttpStatusCodeValidationFailedException {
+ var response = mock(HttpResponse.class);
+ when(httpClient.send(any(), any())).thenReturn(response);
+ when(responseChecker.isSuccessful(response)).thenReturn(true);
+
+ var result = client.send(mock(Supplier.class), mock(HttpResponse.BodyHandler.class));
+
+ verify(httpClient).send(any(), any());
+ assertEquals(response, result);
+ }
+
+ private static Stream> failures() {
+ return Stream.of(RuntimeException.class, InterruptedException.class);
+ }
+
+ @ParameterizedTest
+ @MethodSource("failures")
+ void shouldFailOnException(Class extends Throwable> exceptionClass) throws IOException, InterruptedException {
+ when(httpClient.send(any(), any())).thenThrow(exceptionClass);
+
+ assertThrows(exceptionClass,
+ () -> client.send(mock(Supplier.class), mock(HttpResponse.BodyHandler.class)));
+
+ verify(httpClient).send(any(), any());
+ }
+}
diff --git a/src/test/java/com/getindata/connectors/http/internal/retry/RetryConfigProviderTest.java b/src/test/java/com/getindata/connectors/http/internal/retry/RetryConfigProviderTest.java
new file mode 100644
index 00000000..db0fa8a9
--- /dev/null
+++ b/src/test/java/com/getindata/connectors/http/internal/retry/RetryConfigProviderTest.java
@@ -0,0 +1,63 @@
+package com.getindata.connectors.http.internal.retry;
+
+import java.util.stream.IntStream;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+
+class RetryConfigProviderTest {
+
+ @Test
+ void verifyFixedDelayRetryConfig() {
+ var config = new Configuration();
+ config.setString("gid.connector.http.source.lookup.retry-strategy.type", "fixed-delay");
+ config.setString("gid.connector.http.source.lookup.retry-strategy.fixed-delay.delay", "10s");
+ config.setInteger("lookup.max-retries", 12);
+
+ var retryConfig = RetryConfigProvider.create(config);
+
+ assertEquals(13, retryConfig.getMaxAttempts());
+ IntStream.range(1, 12).forEach(attempt ->
+ assertEquals(10000, retryConfig.getIntervalFunction().apply(attempt))
+ );
+ }
+
+ @Test
+ void verifyExponentialDelayConfig() {
+ var config = new Configuration();
+ config.setString("gid.connector.http.source.lookup.retry-strategy.type", "exponential-delay");
+ config.setString("gid.connector.http.source.lookup.retry-strategy.exponential-delay.initial-backoff", "15ms");
+ config.setString("gid.connector.http.source.lookup.retry-strategy.exponential-delay.max-backoff", "120ms");
+ config.setInteger("gid.connector.http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier", 2);
+ config.setInteger("lookup.max-retries", 6);
+
+ var retryConfig = RetryConfigProvider.create(config);
+ var intervalFunction = retryConfig.getIntervalFunction();
+
+ assertEquals(7, retryConfig.getMaxAttempts());
+ assertEquals(15, intervalFunction.apply(1));
+ assertEquals(30, intervalFunction.apply(2));
+ assertEquals(60, intervalFunction.apply(3));
+ assertEquals(120, intervalFunction.apply(4));
+ assertEquals(120, intervalFunction.apply(5));
+ assertEquals(120, intervalFunction.apply(6));
+ }
+
+ @Test
+ void failWhenStrategyIsUnsupported() {
+ var config = new Configuration();
+ config.setString("gid.connector.http.source.lookup.retry-strategy.type", "dummy");
+
+ try (var mockedStatic = mockStatic(RetryStrategyType.class)) {
+ var dummyStrategy = mock(RetryStrategyType.class);
+ mockedStatic.when(() -> RetryStrategyType.fromCode("dummy")).thenReturn(dummyStrategy);
+
+ assertThrows(IllegalArgumentException.class,
+ () -> RetryConfigProvider.create(config));
+ }
+ }
+}
diff --git a/src/test/java/com/getindata/connectors/http/internal/retry/RetryStrategyTypeTest.java b/src/test/java/com/getindata/connectors/http/internal/retry/RetryStrategyTypeTest.java
new file mode 100644
index 00000000..6a411367
--- /dev/null
+++ b/src/test/java/com/getindata/connectors/http/internal/retry/RetryStrategyTypeTest.java
@@ -0,0 +1,48 @@
+package com.getindata.connectors.http.internal.retry;
+
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class RetryStrategyTypeTest {
+
+ static Stream inputArguments() {
+ return Stream.of(
+ Arguments.of("FIXED-DELAY", RetryStrategyType.FIXED_DELAY),
+ Arguments.of("fixed-delay", RetryStrategyType.FIXED_DELAY),
+ Arguments.of("exponential-delay", RetryStrategyType.EXPONENTIAL_DELAY),
+ Arguments.of("EXPONENTIAL-DELAY", RetryStrategyType.EXPONENTIAL_DELAY)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("inputArguments")
+ void parseFromCodes(String code, RetryStrategyType expectedType) {
+ var result = RetryStrategyType.fromCode(code);
+
+ assertEquals(expectedType, result);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ "fixed_delay",
+ "FIXED_DELAY",
+ "ABC",
+ "FIXED-DELA",
+ "exponential_delay"
+ })
+ void failWhenCodeIsIllegal(String code) {
+ assertThrows(IllegalArgumentException.class, () -> RetryStrategyType.fromCode(code));
+ }
+
+ @Test
+ void failWhenCodeIsNull() {
+ assertThrows(NullPointerException.class, () -> RetryStrategyType.fromCode(null));
+ }
+}
diff --git a/src/test/java/com/getindata/connectors/http/internal/status/HttpCodesParserTest.java b/src/test/java/com/getindata/connectors/http/internal/status/HttpCodesParserTest.java
new file mode 100644
index 00000000..460a3880
--- /dev/null
+++ b/src/test/java/com/getindata/connectors/http/internal/status/HttpCodesParserTest.java
@@ -0,0 +1,92 @@
+package com.getindata.connectors.http.internal.status;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.Singular;
+import org.apache.flink.util.ConfigurationException;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class HttpCodesParserTest {
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ "6XX",
+ "1XXX",
+ "600",
+ "99",
+ "1XX,11",
+ "abc",
+ "!1XX",
+ "1 2 3",
+ "1X X"
+ })
+ void failWhenCodeExpressionIsInvalid(String codeExpression) {
+ assertThrows(ConfigurationException.class,
+ () -> HttpCodesParser.parse(codeExpression));
+ }
+
+ private static Stream inputArgsStream() {
+ return Stream.of(
+ InputArgs.builder().codeExpression("2XX,404,!203,!205")
+ .expectedCodes(range(200, 300, 203, 205))
+ .expectedCode(404)
+ .build(),
+ InputArgs.builder().codeExpression(" 400, 401 , 403, 500,501, !502")
+ .expectedCodes(List.of(400, 401, 403, 500, 501))
+ .build(),
+ InputArgs.builder().codeExpression("!405,1XX, 2XX ,404,!202,405")
+ .expectedCodes(range(100, 300, 202))
+ .expectedCode(404)
+ .build(),
+ InputArgs.builder().codeExpression("!404, 4XX")
+ .expectedCodes(range(400, 500, 404))
+ .build(),
+ InputArgs.builder().codeExpression("2xX,!401,3Xx,4xx")
+ .expectedCodes(range(200, 500, 401))
+ .build()
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("inputArgsStream")
+ void parseCodeExpressionTest(InputArgs inputArgs) throws ConfigurationException {
+ var expectedCodes = inputArgs.getExpectedCodes();
+
+ var result = HttpCodesParser.parse(inputArgs.getCodeExpression());
+
+ for (var code : expectedCodes) {
+ assertTrue(result.contains(code), "Missing code " + code);
+ }
+ for (var code : result) {
+ assertTrue(expectedCodes.contains(code), "Improper code " + code);
+ }
+ }
+
+ private static List range(int start, int endExclusive, int... exclusions) {
+ var exclusionSet = Arrays.stream(exclusions).boxed().collect(Collectors.toSet());
+ return IntStream.range(start, endExclusive).boxed()
+ .filter(item -> !exclusionSet.contains(item))
+ .collect(Collectors.toList());
+ }
+
+ @Builder
+ @Getter
+ private static class InputArgs {
+ @NonNull
+ private final String codeExpression;
+ @Singular
+ private final Set expectedCodes;
+ }
+}
diff --git a/src/test/java/com/getindata/connectors/http/internal/status/HttpResponseCheckerTest.java b/src/test/java/com/getindata/connectors/http/internal/status/HttpResponseCheckerTest.java
new file mode 100644
index 00000000..4cc27052
--- /dev/null
+++ b/src/test/java/com/getindata/connectors/http/internal/status/HttpResponseCheckerTest.java
@@ -0,0 +1,106 @@
+package com.getindata.connectors.http.internal.status;
+
+import java.net.http.HttpResponse;
+import java.util.Set;
+import java.util.stream.Stream;
+import static java.util.Collections.emptySet;
+
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import org.apache.flink.util.ConfigurationException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class HttpResponseCheckerTest {
+
+ @Test
+ void failWhenTheSameCodeIsMarkedSuccessAndError() {
+ assertThrows(ConfigurationException.class,
+ () -> new HttpResponseChecker(Set.of(404), Set.of(404)));
+ }
+
+ @Test
+ void failWhenSuccessListIsEmpty() {
+ assertThrows(ConfigurationException.class,
+ () -> new HttpResponseChecker(emptySet(), Set.of(500)));
+ }
+
+ private static Stream testData() {
+ return Stream.of(
+ new InputArgs(404, CodeType.SUCCESSFUL),
+ new InputArgs(200, CodeType.SUCCESSFUL),
+ new InputArgs(400, CodeType.TEMPORAL_ERROR),
+ new InputArgs(408, CodeType.TEMPORAL_ERROR),
+ new InputArgs(501, CodeType.TEMPORAL_ERROR),
+ new InputArgs(501, CodeType.TEMPORAL_ERROR),
+ new InputArgs(502, CodeType.TEMPORAL_ERROR),
+ new InputArgs(202, CodeType.ERROR),
+ new InputArgs(409, CodeType.ERROR),
+ new InputArgs(100, CodeType.ERROR),
+ new InputArgs(301, CodeType.ERROR));
+ }
+
+ @ParameterizedTest
+ @MethodSource("testData")
+ void verifyCodes(InputArgs inputArgs) throws ConfigurationException {
+ var checker = new HttpResponseChecker("2XX,404,!202", "4XX,!404,500,501,502,!409");
+ var response = inputArgs.getResponse();
+
+ switch (inputArgs.getCodeType()) {
+ case SUCCESSFUL:
+ assertSuccessful(checker, response);
+ break;
+ case TEMPORAL_ERROR:
+ assertTemporalError(checker, response);
+ break;
+ case ERROR:
+ assertError(checker, response);
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private void assertSuccessful(HttpResponseChecker checker, HttpResponse> response) {
+ assertTrue(checker.isSuccessful(response));
+ assertFalse(checker.isTemporalError(response));
+ }
+
+ private void assertTemporalError(HttpResponseChecker checker, HttpResponse> response) {
+ assertFalse(checker.isSuccessful(response));
+ assertTrue(checker.isTemporalError(response));
+ }
+
+ private void assertError(HttpResponseChecker checker, HttpResponse> response) {
+ assertFalse(checker.isSuccessful(response));
+ assertFalse(checker.isTemporalError(response));
+ }
+
+ @RequiredArgsConstructor
+ @Getter
+ private static class InputArgs {
+ @NonNull
+ private final Integer code;
+ @NonNull
+ private final CodeType codeType;
+
+ HttpResponse> getResponse() {
+ var response = mock(HttpResponse.class);
+ when(response.statusCode()).thenReturn(code);
+ return response;
+ }
+ }
+
+ private enum CodeType {
+ SUCCESSFUL, TEMPORAL_ERROR, ERROR
+ }
+}
+
+
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java
index 3b2f0b29..aa099087 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java
@@ -15,6 +15,7 @@
import com.github.tomakehurst.wiremock.client.MappingBuilder;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.matching.StringValuePattern;
+import com.github.tomakehurst.wiremock.stubbing.Scenario;
import com.github.tomakehurst.wiremock.stubbing.StubMapping;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -40,20 +41,14 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
-import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
-import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
-import static com.github.tomakehurst.wiremock.client.WireMock.get;
-import static com.github.tomakehurst.wiremock.client.WireMock.matching;
-import static com.github.tomakehurst.wiremock.client.WireMock.matchingJsonPath;
-import static com.github.tomakehurst.wiremock.client.WireMock.post;
-import static com.github.tomakehurst.wiremock.client.WireMock.put;
-import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
-import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
@Slf4j
-public class HttpLookupTableSourceITCaseTest {
+class HttpLookupTableSourceITCaseTest {
private static final int SERVER_PORT = 9090;
@@ -85,7 +80,7 @@ public class HttpLookupTableSourceITCaseTest {
@SuppressWarnings("unchecked")
@BeforeEach
- public void setup() {
+ void setup() {
File keyStoreFile = new File(SERVER_KEYSTORE_PATH);
File trustStoreFile = new File(SERVER_TRUSTSTORE_PATH);
@@ -110,18 +105,19 @@ public void setup() {
config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
env.configure(config, getClass().getClassLoader());
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
+ env.setParallelism(1); // wire mock server has problem with scenario state during parallel execution
tEnv = StreamTableEnvironment.create(env);
}
@AfterEach
- public void tearDown() {
+ void tearDown() {
wireMockServer.stop();
}
@ParameterizedTest
@ValueSource(strings = {"", "GET", "POST", "PUT"})
- public void testHttpLookupJoin(String methodName) throws Exception {
+ void testHttpLookupJoin(String methodName) throws Exception {
// GIVEN
if (StringUtils.isNullOrWhitespaceOnly(methodName) || methodName.equalsIgnoreCase("GET")) {
@@ -160,14 +156,14 @@ public void testHttpLookupJoin(String methodName) throws Exception {
+ ")";
// WHEN
- SortedSet rows = testLookupJoin(lookupTable);
+ SortedSet rows = testLookupJoin(lookupTable, 4);
// THEN
assertEnrichedRows(rows);
}
@Test
- public void testHttpLookupJoinNoDataFromEndpoint() throws Exception {
+ void testHttpLookupJoinNoDataFromEndpoint() {
// GIVEN
setupServerStubEmptyResponse(wireMockServer);
@@ -193,20 +189,108 @@ public void testHttpLookupJoinNoDataFromEndpoint() throws Exception {
+ ")";
// WHEN/THEN
+ assertThrows(TimeoutException.class, () -> testLookupJoin(lookupTable, 4));
+ }
- boolean timeoutException = false;
- try {
- testLookupJoin(lookupTable);
- } catch (TimeoutException e) {
- // we expect no data produced by query so framework should time out.
- timeoutException = true;
- }
+ @Test
+ void testLookupWithRetry() throws Exception {
+ wireMockServer.stubFor(get(urlPathEqualTo(ENDPOINT))
+ .inScenario("retry")
+ .whenScenarioStateIs(Scenario.STARTED)
+ .withHeader("Content-Type", equalTo("application/json"))
+ .withQueryParam("id", matching("[0-9]+"))
+ .withQueryParam("id2", matching("[0-9]+"))
+ .willReturn(aResponse().withBody(new byte[0]).withStatus(501))
+ .willSetStateTo("temporal_issue_gone")
+ );
+ wireMockServer.stubFor(get(urlPathEqualTo(ENDPOINT))
+ .inScenario("retry")
+ .whenScenarioStateIs("temporal_issue_gone")
+ .withHeader("Content-Type", equalTo("application/json"))
+ .withQueryParam("id", matching("[0-9]+"))
+ .withQueryParam("id2", matching("[0-9]+"))
+ .willReturn(aResponse().withTransformers(JsonTransform.NAME).withStatus(200))
+ );
+
+ var lookupTable =
+ "CREATE TABLE Customers ("
+ + "id STRING,"
+ + "id2 STRING,"
+ + "msg STRING,"
+ + "uuid STRING,"
+ + "details ROW<"
+ + "isActive BOOLEAN,"
+ + "nestedDetails ROW<"
+ + "balance STRING"
+ + ">"
+ + ">"
+ + ") WITH ("
+ + "'format' = 'json',"
+ + "'connector' = 'rest-lookup',"
+ + "'url' = 'http://localhost:9090/client',"
+ + "'lookup.max-retries' = '3',"
+ + "'gid.connector.http.source.lookup.header.Content-Type' = 'application/json',"
+ + "'gid.connector.http.source.lookup.retry-strategy.type' = 'fixed-delay',"
+ + "'gid.connector.http.source.lookup.retry-strategy.fixed-delay.delay' = '1ms',"
+ + "'gid.connector.http.source.lookup.success-codes' = '2XX',"
+ + "'gid.connector.http.source.lookup.retry-codes' = '501'"
+ + ")";
- assertThat(timeoutException).isTrue();
+ var result = testLookupJoin(lookupTable, 1);
+
+ assertEquals(1, result.size());
+ wireMockServer.verify(2, getRequestedFor(urlPathEqualTo(ENDPOINT)));
}
@Test
- public void testHttpsMTlsLookupJoin() throws Exception {
+ void testLookupIgnoreResponse() throws Exception {
+ wireMockServer.stubFor(get(urlPathEqualTo(ENDPOINT))
+ .inScenario("404_on_first")
+ .whenScenarioStateIs(Scenario.STARTED)
+ .withHeader("Content-Type", equalTo("application/json"))
+ .withQueryParam("id", matching("[0-9]+"))
+ .withQueryParam("id2", matching("[0-9]+"))
+ .willReturn(aResponse().withBody(JsonTransform.NAME).withStatus(404))
+ .willSetStateTo("second_request")
+ );
+ wireMockServer.stubFor(get(urlPathEqualTo(ENDPOINT))
+ .inScenario("404_on_first")
+ .whenScenarioStateIs("second_request")
+ .withHeader("Content-Type", equalTo("application/json"))
+ .withQueryParam("id", matching("[0-9]+"))
+ .withQueryParam("id2", matching("[0-9]+"))
+ .willReturn(aResponse().withTransformers(JsonTransform.NAME).withStatus(200))
+ );
+
+ var lookupTable =
+ "CREATE TABLE Customers ("
+ + "id STRING,"
+ + "id2 STRING,"
+ + "msg STRING,"
+ + "uuid STRING,"
+ + "details ROW<"
+ + "isActive BOOLEAN,"
+ + "nestedDetails ROW<"
+ + "balance STRING"
+ + ">"
+ + ">"
+ + ") WITH ("
+ + "'format' = 'json',"
+ + "'connector' = 'rest-lookup',"
+ + "'url' = 'http://localhost:9090/client',"
+ + "'gid.connector.http.source.lookup.header.Content-Type' = 'application/json',"
+ + "'gid.connector.http.source.lookup.success-codes' = '2XX,404',"
+ + "'gid.connector.http.source.lookup.ignored-response-codes' = '404'"
+ + ")";
+
+ var result = testLookupJoin(lookupTable, 3);
+
+ assertEquals(2, result.size());
+ wireMockServer.verify(3, getRequestedFor(urlPathEqualTo(ENDPOINT)));
+ }
+
+ @Test
+ void testHttpsMTlsLookupJoin() throws Exception {
// GIVEN
File serverTrustedCert = new File(CERTS_PATH + "ca.crt");
@@ -243,14 +327,14 @@ public void testHttpsMTlsLookupJoin() throws Exception {
);
// WHEN
- SortedSet rows = testLookupJoin(lookupTable);
+ SortedSet rows = testLookupJoin(lookupTable, 4);
// THEN
assertEnrichedRows(rows);
}
@Test
- public void testLookupJoinProjectionPushDown() throws Exception {
+ void testLookupJoinProjectionPushDown() throws Exception {
// GIVEN
setUpServerBodyStub(
@@ -327,7 +411,7 @@ public void testLookupJoinProjectionPushDown() throws Exception {
}
@Test
- public void testLookupJoinProjectionPushDownNested() throws Exception {
+ void testLookupJoinProjectionPushDownNested() throws Exception {
// GIVEN
setUpServerBodyStub(
@@ -404,7 +488,7 @@ public void testLookupJoinProjectionPushDownNested() throws Exception {
}
@Test
- public void testLookupJoinOnRowType() throws Exception {
+ void testLookupJoinOnRowType() throws Exception {
// GIVEN
setUpServerBodyStub(
@@ -473,7 +557,7 @@ public void testLookupJoinOnRowType() throws Exception {
}
@Test
- public void testLookupJoinOnRowTypeAndRootColumn() throws Exception {
+ void testLookupJoinOnRowTypeAndRootColumn() throws Exception {
// GIVEN
setUpServerBodyStub(
@@ -544,8 +628,7 @@ public void testLookupJoinOnRowTypeAndRootColumn() throws Exception {
}
@Test
- public void testLookupJoinOnRowWithRowType() throws Exception {
-
+ void testLookupJoinOnRowWithRowType() throws Exception {
testLookupJoinOnRowWithRowTypeImpl();
}
@@ -557,7 +640,7 @@ public void testLookupJoinOnRowWithRowType() throws Exception {
"Basic dXNlcjpwYXNzd29yZA==, Basic dXNlcjpwYXNzd29yZA==, true",
"Bearer dXNlcjpwYXNzd29yZA==, Bearer dXNlcjpwYXNzd29yZA==, true"
})
- public void testLookupWithUseRawAuthHeader(
+ void testLookupWithUseRawAuthHeader(
String authHeaderRawValue,
String expectedAuthHeaderValue,
boolean useRawAuthHeader) throws Exception {
@@ -659,7 +742,7 @@ private void testLookupJoinOnRowWithRowTypeImpl(
}
@Test
- public void testNestedLookupJoinWithoutCast() throws Exception {
+ void testNestedLookupJoinWithoutCast() throws Exception {
// TODO ADD MORE ASSERTS
// GIVEN
@@ -780,7 +863,7 @@ public void testNestedLookupJoinWithoutCast() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testHttpLookupJoinWithCache(boolean isAsync) throws Exception {
+ void testHttpLookupJoinWithCache(boolean isAsync) throws Exception {
// GIVEN
LookupCacheManager.keepCacheOnRelease(true);
@@ -810,7 +893,7 @@ public void testHttpLookupJoinWithCache(boolean isAsync) throws Exception {
+ ")";
// WHEN
- SortedSet rows = testLookupJoin(lookupTable);
+ SortedSet rows = testLookupJoin(lookupTable, 4);
// THEN
try {
@@ -839,7 +922,7 @@ private LookupCache getCache() {
return managedCaches.get(managedCaches.keySet().iterator().next()).getCache();
}
- private @NotNull SortedSet testLookupJoin(String lookupTable) throws Exception {
+ private @NotNull SortedSet testLookupJoin(String lookupTable, int maxRows) throws Exception {
String sourceTable =
"CREATE TABLE Orders ("
@@ -851,10 +934,10 @@ private LookupCache getCache() {
+ "'rows-per-second' = '1',"
+ "'fields.id.kind' = 'sequence',"
+ "'fields.id.start' = '1',"
- + "'fields.id.end' = '5',"
+ + "'fields.id.end' = '" + maxRows + "',"
+ "'fields.id2.kind' = 'sequence',"
+ "'fields.id2.start' = '2',"
- + "'fields.id2.end' = '5'"
+ + "'fields.id2.end' = '" + (maxRows + 1) + "'"
+ ")";
tEnv.executeSql(sourceTable);
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java
index 95cc0f9f..a6c57782 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientConnectionTest.java
@@ -22,6 +22,7 @@
import org.apache.flink.table.factories.DynamicTableFactory.Context;
import org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.ConfigurationException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@@ -45,6 +46,7 @@
import static com.github.tomakehurst.wiremock.client.WireMock.putRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
@@ -54,6 +56,8 @@
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;
import static com.getindata.connectors.http.TestHelper.readTestFile;
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.RESULT_TYPE;
+import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES;
+import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_HTTP_SUCCESS_CODES;
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSourceFactory.row;
@ExtendWith(MockitoExtension.class)
@@ -62,7 +66,7 @@ class JavaNetHttpPollingClientConnectionTest {
private static final String SAMPLES_FOLDER = "/http/";
private static final String SAMPLES_FOLDER_ARRAY_RESULT = "/http-array-result/";
private static final String SAMPLES_FOLDER_ARRAY_RESULT_WITH_NULLS =
- "/http-array-result-with-nulls/";
+ "/http-array-result-with-nulls/";
private static final String ENDPOINT = "/service";
@@ -77,60 +81,64 @@ class JavaNetHttpPollingClientConnectionTest {
private Properties properties;
+ private Configuration configuration;
+
private RowData lookupRowData;
private DataType lookupPhysicalDataType;
@BeforeAll
- public static void setUpAll() {
+ static void setUpAll() {
wireMockServer = new WireMockServer();
wireMockServer.start();
}
@AfterAll
- public static void cleanUpAll() {
+ static void cleanUpAll() {
if (wireMockServer != null) {
wireMockServer.stop();
}
}
@BeforeEach
- public void setUp() {
+ void setUp() {
int[][] lookupKey = {{}};
this.dynamicTableSourceContext = new LookupRuntimeProviderContext(lookupKey);
this.lookupRowData = GenericRowData.of(
- StringData.fromString("1"),
- StringData.fromString("2")
+ StringData.fromString("1"),
+ StringData.fromString("2")
);
this.lookupPhysicalDataType = row(List.of(
- DataTypes.FIELD("id", DataTypes.STRING()),
- DataTypes.FIELD("uuid", DataTypes.STRING())
- )
+ DataTypes.FIELD("id", DataTypes.STRING()),
+ DataTypes.FIELD("uuid", DataTypes.STRING())
+ )
);
this.properties = new Properties();
this.properties.setProperty(
- HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX + "Content-Type",
- "application/json"
+ HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX + "Content-Type",
+ "application/json"
);
this.properties.setProperty(RESULT_TYPE, "single-value");
+
+ this.configuration = new Configuration();
}
@AfterEach
- public void cleanUp() {
+ void cleanUp() {
if (stubMapping != null && wireMockServer != null) {
wireMockServer.removeStub(stubMapping);
}
}
@Test
- void shouldQuery200WithParams() {
+ void shouldQuery200WithParams() throws ConfigurationException {
// GIVEN
this.stubMapping = setUpServerStub(200);
- JavaNetHttpPollingClient pollingClient = setUpPollingClient(getBaseUrl());
+ JavaNetHttpPollingClient pollingClient = setUpPollingClient();
// WHEN
Collection results = pollingClient.pull(lookupRowData);
@@ -142,7 +150,7 @@ void shouldQuery200WithParams() {
RowData result = results.iterator().next();
assertThat(result.getArity()).isEqualTo(4);
assertThat(result.getString(1)
- .toString()).isEqualTo("Returned HTTP message for parameter PARAM, COUNTER");
+ .toString()).isEqualTo("Returned HTTP message for parameter PARAM, COUNTER");
RowData detailsRow = result.getRow(3, 2);
assertThat(detailsRow.getBoolean(0)).isEqualTo(true);
@@ -153,16 +161,11 @@ void shouldQuery200WithParams() {
@ParameterizedTest
@ValueSource(strings = {"PUT", "POST"})
- void shouldQuery200WithBodyParams(String methodName) {
+ void shouldQuery200WithBodyParams(String methodName) throws ConfigurationException {
// GIVEN
this.stubMapping = setUpServerBodyStub(methodName);
- JavaNetHttpPollingClient pollingClient =
- setUpPollingClient(
- getBaseUrl(),
- properties,
- setUpBodyRequestFactory(methodName, properties)
- );
+ JavaNetHttpPollingClient pollingClient = setUpPollingClient(setUpBodyRequestFactory(methodName));
// WHEN
Collection results = pollingClient.pull(lookupRowData);
@@ -181,7 +184,7 @@ void shouldQuery200WithBodyParams(String methodName) {
RowData result = results.iterator().next();
assertThat(result.getArity()).isEqualTo(4);
assertThat(result.getString(1)
- .toString()).isEqualTo("Returned HTTP message for parameter PARAM, COUNTER");
+ .toString()).isEqualTo("Returned HTTP message for parameter PARAM, COUNTER");
RowData detailsRow = result.getRow(3, 2);
assertThat(detailsRow.getBoolean(0)).isEqualTo(true);
@@ -192,23 +195,20 @@ void shouldQuery200WithBodyParams(String methodName) {
private static Stream clientErrorCodeConfig() {
return Stream.of(
- Arguments.of(prepareErrorCodeProperties("4XX", ""), false),
- Arguments.of(prepareErrorCodeProperties("2XX", " "), true),
- Arguments.of(prepareErrorCodeProperties("2xx", "201"), false)
+ Arguments.of("2XX", "", false),
+ Arguments.of("2XX", "201", true),
+ Arguments.of("200,201,202,", "202", false)
);
}
@Test
- void shouldQuery200WithArrayResult() {
+ void shouldQuery200WithArrayResult() throws ConfigurationException {
// GIVEN
this.stubMapping = setUpServerStubArrayResult(200);
-
- Properties properties = new Properties();
- properties.putAll(this.properties);
properties.setProperty(RESULT_TYPE, "array");
// WHEN
- JavaNetHttpPollingClient pollingClient = setUpPollingClient(getBaseUrl(), properties);
+ JavaNetHttpPollingClient pollingClient = setUpPollingClient();
// WHEN
Collection results = pollingClient.pull(lookupRowData);
@@ -236,16 +236,13 @@ void shouldQuery200WithArrayResult() {
}
@Test
- void shouldQuery200WithArrayResultWithNulls() {
+ void shouldQuery200WithArrayResultWithNulls() throws ConfigurationException {
// GIVEN
this.stubMapping = setUpServerStubArrayResultWithNulls(200);
-
- Properties properties = new Properties();
- properties.putAll(this.properties);
properties.setProperty(RESULT_TYPE, "array");
// WHEN
- JavaNetHttpPollingClient pollingClient = setUpPollingClient(getBaseUrl(), properties);
+ JavaNetHttpPollingClient pollingClient = setUpPollingClient();
// WHEN
Collection results = pollingClient.pull(lookupRowData);
@@ -268,16 +265,16 @@ void shouldQuery200WithArrayResultWithNulls() {
@ParameterizedTest
@MethodSource("clientErrorCodeConfig")
void shouldHandleCodeBasedOnConfiguration(
- Properties properties,
- boolean isExpectedResponseEmpty) {
+ String successCodesExpression,
+ String ignoredResponseCodesExpression,
+ boolean isExpectedResponseEmpty
+ ) throws ConfigurationException {
// GIVEN
this.stubMapping = setUpServerStub(201);
- JavaNetHttpPollingClient pollingClient = setUpPollingClient(
- getBaseUrl(),
- properties,
- setUpGetRequestFactory(properties)
- );
+ configuration.setString(SOURCE_LOOKUP_HTTP_SUCCESS_CODES, successCodesExpression);
+ configuration.setString(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES, ignoredResponseCodesExpression);
+ JavaNetHttpPollingClient pollingClient = setUpPollingClient();
// WHEN
Collection results = pollingClient.pull(lookupRowData);
@@ -287,27 +284,21 @@ void shouldHandleCodeBasedOnConfiguration(
}
@Test
- void shouldHandleServerError() {
+ void shouldFailOnServerError() throws ConfigurationException {
// GIVEN
this.stubMapping = setUpServerStub(500);
- JavaNetHttpPollingClient pollingClient = setUpPollingClient(getBaseUrl());
+ JavaNetHttpPollingClient pollingClient = setUpPollingClient();
- // WHEN
- Collection results = pollingClient.pull(lookupRowData);
-
- // THEN
- wireMockServer.verify(RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest()));
-
- assertThat(results.isEmpty()).isTrue();
+ assertThrows(RuntimeException.class, () -> pollingClient.pull(lookupRowData));
}
@Test
- void shouldProcessWithMissingArguments() {
+ void shouldProcessWithMissingArguments() throws ConfigurationException {
// GIVEN
this.stubMapping = setUpServerStub(200);
- JavaNetHttpPollingClient pollingClient = setUpPollingClient(getBaseUrl());
+ JavaNetHttpPollingClient pollingClient = setUpPollingClient();
// WHEN
Collection results = pollingClient.pull(null);
@@ -318,22 +309,19 @@ void shouldProcessWithMissingArguments() {
@ParameterizedTest
@CsvSource({
- "user:password, false",
- "Basic dXNlcjpwYXNzd29yZA==, false",
- "Basic dXNlcjpwYXNzd29yZA==, true"
+ "user:password, false",
+ "Basic dXNlcjpwYXNzd29yZA==, false",
+ "Basic dXNlcjpwYXNzd29yZA==, true"
})
public void shouldConnectWithBasicAuth(String authorizationHeaderValue,
- boolean useRawAuthHeader) {
+ boolean useRawAuthHeader) throws ConfigurationException {
// GIVEN
this.stubMapping = setupServerStubForBasicAuth();
- Properties properties = new Properties();
- properties.putAll(this.properties);
-
properties.setProperty(
- HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX + "Authorization",
- authorizationHeaderValue
+ HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX + "Authorization",
+ authorizationHeaderValue
);
properties.setProperty(
@@ -341,11 +329,7 @@ public void shouldConnectWithBasicAuth(String authorizationHeaderValue,
Boolean.toString(useRawAuthHeader)
);
- JavaNetHttpPollingClient pollingClient = setUpPollingClient(
- getBaseUrl(),
- properties,
- setUpGetRequestFactory(properties)
- );
+ JavaNetHttpPollingClient pollingClient = setUpPollingClient();
// WHEN
Collection results = pollingClient.pull(lookupRowData);
@@ -357,7 +341,7 @@ public void shouldConnectWithBasicAuth(String authorizationHeaderValue,
RowData result = results.iterator().next();
assertThat(result.getArity()).isEqualTo(4);
assertThat(result.getString(1)
- .toString()).isEqualTo("Returned HTTP message for parameter PARAM, COUNTER");
+ .toString()).isEqualTo("Returned HTTP message for parameter PARAM, COUNTER");
RowData detailsRow = result.getRow(3, 2);
assertThat(detailsRow.getBoolean(0)).isEqualTo(true);
@@ -370,151 +354,145 @@ private String getBaseUrl() {
return wireMockServer.baseUrl() + ENDPOINT;
}
- public JavaNetHttpPollingClient setUpPollingClient(String url) {
- return setUpPollingClient(url, properties);
- }
-
- public JavaNetHttpPollingClient setUpPollingClient(String url, Properties properties) {
- return setUpPollingClient(url, properties, setUpGetRequestFactory(properties));
+ public JavaNetHttpPollingClient setUpPollingClient() throws ConfigurationException {
+ return setUpPollingClient(setUpGetRequestFactory());
}
- private GetRequestFactory setUpGetRequestFactory(Properties properties) {
+ private GetRequestFactory setUpGetRequestFactory() {
LookupRow lookupRow = new LookupRow()
- .addLookupEntry(
- new RowDataSingleValueLookupSchemaEntry("id",
- RowData.createFieldGetter(
- DataTypes.STRING().getLogicalType(),
- 0)))
- .addLookupEntry(
- new RowDataSingleValueLookupSchemaEntry("uuid",
- RowData.createFieldGetter(
- DataTypes.STRING().getLogicalType(),
- 1))
- );
+ .addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry("id",
+ RowData.createFieldGetter(
+ DataTypes.STRING().getLogicalType(),
+ 0)))
+ .addLookupEntry(
+ new RowDataSingleValueLookupSchemaEntry("uuid",
+ RowData.createFieldGetter(
+ DataTypes.STRING().getLogicalType(),
+ 1))
+ );
lookupRow.setLookupPhysicalRowDataType(lookupPhysicalDataType);
boolean useRawAuthHeader = Boolean.parseBoolean(
- (String)properties.get(HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_USE_RAW));
+ (String) properties.get(HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_USE_RAW));
return new GetRequestFactory(
- new GenericGetQueryCreator(lookupRow),
- HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(useRawAuthHeader),
- HttpLookupConfig.builder()
- .url(getBaseUrl())
- .properties(properties)
- .build()
+ new GenericGetQueryCreator(lookupRow),
+ HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(useRawAuthHeader),
+ HttpLookupConfig.builder()
+ .url(getBaseUrl())
+ .readableConfig(configuration)
+ .properties(properties)
+ .build()
);
}
- private BodyBasedRequestFactory setUpBodyRequestFactory(
- String methodName,
- Properties properties) {
+ private BodyBasedRequestFactory setUpBodyRequestFactory(String methodName) {
SerializationSchema jsonSerializer =
- new JsonFormatFactory()
- .createEncodingFormat(dynamicTableFactoryContext, new Configuration())
- .createRuntimeEncoder(null, lookupPhysicalDataType);
+ new JsonFormatFactory()
+ .createEncodingFormat(dynamicTableFactoryContext, new Configuration())
+ .createRuntimeEncoder(null, lookupPhysicalDataType);
boolean useRawAuthHeader = Boolean.parseBoolean(
- (String)properties.get(HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_USE_RAW));
+ (String) properties.get(HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_USE_RAW));
return new BodyBasedRequestFactory(
- methodName,
- new GenericJsonQueryCreator(jsonSerializer),
- HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(useRawAuthHeader),
- HttpLookupConfig.builder()
- .url(getBaseUrl())
- .properties(properties)
- .build()
+ methodName,
+ new GenericJsonQueryCreator(jsonSerializer),
+ HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(useRawAuthHeader),
+ HttpLookupConfig.builder()
+ .url(getBaseUrl())
+ .properties(properties)
+ .build()
);
}
private JavaNetHttpPollingClient setUpPollingClient(
- String url,
- Properties properties,
- HttpRequestFactory requestFactory) {
+ HttpRequestFactory requestFactory) throws ConfigurationException {
HttpLookupConfig lookupConfig = HttpLookupConfig.builder()
- .url(url)
- .properties(properties)
- .httpPostRequestCallback(new Slf4JHttpLookupPostRequestCallback())
- .build();
+ .url(getBaseUrl())
+ .readableConfig(configuration)
+ .properties(properties)
+ .httpPostRequestCallback(new Slf4JHttpLookupPostRequestCallback())
+ .build();
DataType physicalDataType = DataTypes.ROW(
- DataTypes.FIELD("id", DataTypes.STRING()),
- DataTypes.FIELD("msg", DataTypes.STRING()),
- DataTypes.FIELD("uuid", DataTypes.STRING()),
- DataTypes.FIELD("details", DataTypes.ROW(
- DataTypes.FIELD("isActive", DataTypes.BOOLEAN()),
- DataTypes.FIELD("nestedDetails", DataTypes.ROW(
- DataTypes.FIELD("balance", DataTypes.STRING())
+ DataTypes.FIELD("id", DataTypes.STRING()),
+ DataTypes.FIELD("msg", DataTypes.STRING()),
+ DataTypes.FIELD("uuid", DataTypes.STRING()),
+ DataTypes.FIELD("details", DataTypes.ROW(
+ DataTypes.FIELD("isActive", DataTypes.BOOLEAN()),
+ DataTypes.FIELD("nestedDetails", DataTypes.ROW(
+ DataTypes.FIELD("balance", DataTypes.STRING())
+ ))
))
- ))
);
DeserializationSchema schemaDecoder =
- new JsonFormatFactory()
- .createDecodingFormat(dynamicTableFactoryContext, new Configuration())
- .createRuntimeDecoder(dynamicTableSourceContext, physicalDataType);
+ new JsonFormatFactory()
+ .createDecodingFormat(dynamicTableFactoryContext, new Configuration())
+ .createRuntimeDecoder(dynamicTableSourceContext, physicalDataType);
try {
schemaDecoder.open(
- SerializationSchemaUtils.createDeserializationInitContext(
- JavaNetHttpPollingClientConnectionTest.class));
+ SerializationSchemaUtils.createDeserializationInitContext(
+ JavaNetHttpPollingClientConnectionTest.class));
} catch (Exception e) {
throw new RuntimeException("Unable to open schema decoder: " + e.getMessage(), e);
}
JavaNetHttpPollingClientFactory pollingClientFactory =
- new JavaNetHttpPollingClientFactory(requestFactory);
+ new JavaNetHttpPollingClientFactory(requestFactory);
return pollingClientFactory.createPollClient(lookupConfig, schemaDecoder);
}
private StubMapping setUpServerStub(int status) {
return wireMockServer.stubFor(
- get(urlEqualTo(ENDPOINT + "?id=1&uuid=2"))
- .withHeader("Content-Type", equalTo("application/json"))
- .willReturn(
- aResponse()
- .withStatus(status)
- .withBody(readTestFile(SAMPLES_FOLDER + "HttpResult.json"))));
+ get(urlEqualTo(ENDPOINT + "?id=1&uuid=2"))
+ .withHeader("Content-Type", equalTo("application/json"))
+ .willReturn(
+ aResponse()
+ .withStatus(status)
+ .withBody(readTestFile(SAMPLES_FOLDER + "HttpResult.json"))));
}
private StubMapping setUpServerBodyStub(String methodName) {
MappingBuilder methodStub = (methodName.equalsIgnoreCase("PUT") ?
- put(urlEqualTo(ENDPOINT)) :
- post(urlEqualTo(ENDPOINT)));
+ put(urlEqualTo(ENDPOINT)) :
+ post(urlEqualTo(ENDPOINT)));
return wireMockServer.stubFor(
- methodStub
- .withHeader("Content-Type", equalTo("application/json"))
- .withRequestBody(equalToJson("{\"id\" : \"1\", \"uuid\" : \"2\"}"))
- .willReturn(
- aResponse()
- .withStatus(200)
- .withBody(readTestFile(SAMPLES_FOLDER + "HttpResult.json"))));
+ methodStub
+ .withHeader("Content-Type", equalTo("application/json"))
+ .withRequestBody(equalToJson("{\"id\" : \"1\", \"uuid\" : \"2\"}"))
+ .willReturn(
+ aResponse()
+ .withStatus(200)
+ .withBody(readTestFile(SAMPLES_FOLDER + "HttpResult.json"))));
}
private StubMapping setUpServerStubArrayResult(int status) {
return wireMockServer.stubFor(
- get(urlEqualTo(ENDPOINT + "?id=1&uuid=2"))
- .withHeader("Content-Type", equalTo("application/json"))
- .willReturn(
- aResponse()
- .withStatus(status)
- .withBody(readTestFile(SAMPLES_FOLDER_ARRAY_RESULT + "HttpResult.json"))));
+ get(urlEqualTo(ENDPOINT + "?id=1&uuid=2"))
+ .withHeader("Content-Type", equalTo("application/json"))
+ .willReturn(
+ aResponse()
+ .withStatus(status)
+ .withBody(readTestFile(SAMPLES_FOLDER_ARRAY_RESULT + "HttpResult.json"))));
}
private StubMapping setUpServerStubArrayResultWithNulls(int status) {
return wireMockServer.stubFor(
- get(urlEqualTo(ENDPOINT + "?id=1&uuid=2"))
- .withHeader("Content-Type", equalTo("application/json"))
- .willReturn(
- aResponse()
- .withStatus(status)
- .withBody(readTestFile(
- SAMPLES_FOLDER_ARRAY_RESULT_WITH_NULLS + "HttpResult.json"))));
+ get(urlEqualTo(ENDPOINT + "?id=1&uuid=2"))
+ .withHeader("Content-Type", equalTo("application/json"))
+ .willReturn(
+ aResponse()
+ .withStatus(status)
+ .withBody(readTestFile(
+ SAMPLES_FOLDER_ARRAY_RESULT_WITH_NULLS + "HttpResult.json"))));
}
private StubMapping setupServerStubForBasicAuth() {
@@ -522,28 +500,8 @@ private StubMapping setupServerStubForBasicAuth() {
.withHeader("Content-Type", equalTo("application/json"))
.withBasicAuth("user", "password")
.willReturn(
- aResponse()
- .withStatus(200)
- .withBody(readTestFile(SAMPLES_FOLDER + "HttpResult.json"))));
- }
-
- private static Properties prepareErrorCodeProperties(String errorCodeList, String whiteList) {
- Properties properties = new Properties();
- properties.setProperty(
- HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST,
- whiteList
- );
- properties.setProperty(
- HttpConnectorConfigConstants.HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST,
- errorCodeList
- );
-
- properties.setProperty(
- HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_PREFIX + "Content-Type",
- "application/json");
-
- properties.setProperty(RESULT_TYPE, "single-value");
-
- return properties;
+ aResponse()
+ .withStatus(200)
+ .withBody(readTestFile(SAMPLES_FOLDER + "HttpResult.json"))));
}
}
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactoryTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactoryTest.java
index 8beaa685..eaa8ab16 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactoryTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientFactoryTest.java
@@ -2,6 +2,7 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.ConfigurationException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -18,7 +19,7 @@ public void setUp() {
@Test
@SuppressWarnings("unchecked")
- void shouldCreateClient() {
+ void shouldCreateClient() throws ConfigurationException {
assertThat(
factory.createPollClient(
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java
index 25719e63..69fb7fd5 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientHttpsConnectionTest.java
@@ -19,6 +19,7 @@
import org.apache.flink.table.factories.DynamicTableFactory.Context;
import org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.ConfigurationException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -86,7 +87,7 @@ public void tearDown() {
}
@Test
- public void testHttpsConnectionWithSelfSignedCert() {
+ public void testHttpsConnectionWithSelfSignedCert() throws ConfigurationException {
File keyStoreFile = new File(SERVER_KEYSTORE_PATH);
@@ -107,7 +108,7 @@ public void testHttpsConnectionWithSelfSignedCert() {
@ParameterizedTest
@ValueSource(strings = {"ca.crt", "server.crt"})
- public void testHttpsConnectionWithAddedCerts(String certName) {
+ public void testHttpsConnectionWithAddedCerts(String certName) throws ConfigurationException {
File keyStoreFile = new File(SERVER_KEYSTORE_PATH);
File trustedCert = new File(CERTS_PATH + certName);
@@ -131,7 +132,7 @@ public void testHttpsConnectionWithAddedCerts(String certName) {
@ParameterizedTest
@ValueSource(strings = {"clientPrivateKey.pem", "clientPrivateKey.der"})
- public void testMTlsConnection(String clientPrivateKeyName) {
+ public void testMTlsConnection(String clientPrivateKeyName) throws ConfigurationException {
File keyStoreFile = new File(SERVER_KEYSTORE_PATH);
File trustStoreFile = new File(SERVER_TRUSTSTORE_PATH);
@@ -169,7 +170,7 @@ public void testMTlsConnection(String clientPrivateKeyName) {
}
@Test
- public void testMTlsConnectionUsingKeyStore() {
+ public void testMTlsConnectionUsingKeyStore() throws ConfigurationException {
String password = "password";
String clientKeyStoreName = "client_keyStore.p12";
@@ -209,7 +210,7 @@ public void testMTlsConnectionUsingKeyStore() {
setupAndTestConnection();
}
- private void setupAndTestConnection() {
+ private void setupAndTestConnection() throws ConfigurationException {
// test with basic auth
setupAndTestConnectionWithAuth(
HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor());
@@ -223,7 +224,7 @@ private void setupAndTestConnection() {
);
}
- private void setupAndTestConnectionWithAuth(HeaderPreprocessor headerPreprocessor) {
+ private void setupAndTestConnectionWithAuth(HeaderPreprocessor headerPreprocessor) throws ConfigurationException {
setUpPollingClientFactory(wireMockServer.baseUrl(),
headerPreprocessor);
testPollingClientConnection();
@@ -260,14 +261,14 @@ public void shouldThrowOnInvalidPath(
assertThrows(RuntimeException.class, () -> setUpPollingClient(properties));
}
- private void testPollingClientConnection() {
+ private void testPollingClientConnection() throws ConfigurationException {
JavaNetHttpPollingClient pollingClient = setUpPollingClient(properties);
Collection result = pollingClient.pull(lookupRowData);
assertResult(result);
}
- private JavaNetHttpPollingClient setUpPollingClient(Properties properties) {
+ private JavaNetHttpPollingClient setUpPollingClient(Properties properties) throws ConfigurationException {
HttpLookupConfig lookupConfig = HttpLookupConfig.builder()
.url("https://localhost:" + HTTPS_SERVER_PORT + ENDPOINT)
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java
index 1fdcd09d..78490aad 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientTest.java
@@ -15,6 +15,7 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.ConfigurationException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -57,7 +58,7 @@ public void setUp() {
}
@Test
- public void shouldBuildClientWithoutHeaders() {
+ public void shouldBuildClientWithoutHeaders() throws ConfigurationException {
JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(
httpClient,
@@ -76,7 +77,7 @@ public void shouldBuildClientWithoutHeaders() {
}
@Test
- public void shouldBuildGetClientUri() {
+ public void shouldBuildGetClientUri() throws ConfigurationException {
// GIVEN
JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(
httpClient,
@@ -175,7 +176,7 @@ public void shouldBuildBodyBasedClientUri() {
}
@Test
- public void shouldBuildClientWithHeaders() {
+ public void shouldBuildClientWithHeaders() throws ConfigurationException {
// GIVEN
Properties properties = new Properties();
diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientWithWireTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientWithWireTest.java
index 3c893076..ae4997ab 100644
--- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientWithWireTest.java
+++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClientWithWireTest.java
@@ -2,28 +2,35 @@
import java.io.File;
import java.net.URI;
+import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.time.Duration;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
-import com.github.tomakehurst.wiremock.stubbing.StubMapping;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.ConfigurationException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import static com.github.tomakehurst.wiremock.client.WireMock.*;
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
import static com.getindata.connectors.http.TestHelper.readTestFile;
-import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.*;
+import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL;
+import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION;
+import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST;
public class JavaNetHttpPollingClientWithWireTest {
private static final String BASE_URL = "http://localhost.com";
@@ -43,6 +50,7 @@ public class JavaNetHttpPollingClientWithWireTest {
private static final String BEARER_REQUEST = "Bearer Dummy";
private WireMockServer wireMockServer;
+
@SuppressWarnings("unchecked")
@BeforeEach
public void setup() {
@@ -79,32 +87,34 @@ public void tearDown() {
@Test
- public void shouldUpdateHttpRequestIfRequiredGet() {
+ public void shouldUpdateHttpRequestIfRequiredGet() throws ConfigurationException {
HttpRequest httpRequest = HttpRequest.newBuilder()
.GET()
.uri(URI.create(BASE_URL))
.timeout(Duration.ofSeconds(1))
- .setHeader("Origin","*")
- .setHeader("X-Content-Type-Options","nosniff")
- .setHeader("Content-Type","application/json")
+ .setHeader("Origin", "*")
+ .setHeader("X-Content-Type-Options", "nosniff")
+ .setHeader("Content-Type", "application/json")
.build();
shouldUpdateHttpRequestIfRequired(httpRequest);
}
+
@Test
- public void shouldUpdateHttpRequestIfRequiredPut() {
+ public void shouldUpdateHttpRequestIfRequiredPut() throws ConfigurationException {
HttpRequest httpRequest = HttpRequest.newBuilder()
- .PUT( HttpRequest.BodyPublishers.ofString("foo"))
+ .PUT(HttpRequest.BodyPublishers.ofString("foo"))
.uri(URI.create(BASE_URL))
.timeout(Duration.ofSeconds(1))
- .setHeader("Origin","*")
- .setHeader("X-Content-Type-Options","nosniff")
- .setHeader("Content-Type","application/json")
+ .setHeader("Origin", "*")
+ .setHeader("X-Content-Type-Options", "nosniff")
+ .setHeader("Content-Type", "application/json")
.build();
shouldUpdateHttpRequestIfRequired(httpRequest);
}
- private void shouldUpdateHttpRequestIfRequired(HttpRequest httpRequest) {
+
+ private void shouldUpdateHttpRequestIfRequired(HttpRequest httpRequest) throws ConfigurationException {
setUpServerBodyStub();
- JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(null,
+ JavaNetHttpPollingClient client = new JavaNetHttpPollingClient(mock(HttpClient.class),
null,
HttpLookupConfig.builder().url(BASE_URL).build(),
null);
@@ -118,11 +128,11 @@ private void shouldUpdateHttpRequestIfRequired(HttpRequest httpRequest) {
HttpRequest newHttpRequest = client.updateHttpRequestIfRequired(request,
oidcHeaderPreProcessor);
assertThat(httpRequest).isEqualTo(newHttpRequest);
- configuration.setString(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL.key(),"http://localhost:9090/auth");
+ configuration.setString(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL.key(), "http://localhost:9090/auth");
configuration.setString(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST, BEARER_REQUEST);
configuration.set(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION,
Duration.ofSeconds(1L));
- client = new JavaNetHttpPollingClient(null,
+ client = new JavaNetHttpPollingClient(mock(HttpClient.class),
null,
HttpLookupConfig.builder().url(BASE_URL).readableConfig(configuration).build(),
null);
@@ -138,8 +148,8 @@ private void shouldUpdateHttpRequestIfRequired(HttpRequest httpRequest) {
.isEqualTo(newHttpRequest.headers().map().get("Content-Type"));
}
- private StubMapping setUpServerBodyStub() {
- return wireMockServer.stubFor(
+ private void setUpServerBodyStub() {
+ wireMockServer.stubFor(
post(urlEqualTo(ENDPOINT))
.withHeader("Content-Type", equalTo("application/x-www-form-urlencoded"))
.withRequestBody(equalTo(BEARER_REQUEST))
diff --git a/src/test/resources/log4j2-test.properties b/src/test/resources/log4j2-test.properties
deleted file mode 100644
index 11862f58..00000000
--- a/src/test/resources/log4j2-test.properties
+++ /dev/null
@@ -1,25 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-rootLogger.level = INFO
-rootLogger.appenderRef.console.ref = ConsoleAppender
-
-appender.console.name = ConsoleAppender
-appender.console.type = CONSOLE
-appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{HH:mm:ss,SSS} %t %-5p %C{1.} %x - %m%n
diff --git a/src/test/resources/log4j2.properties b/src/test/resources/log4j2.properties
deleted file mode 100644
index 78c5ab61..00000000
--- a/src/test/resources/log4j2.properties
+++ /dev/null
@@ -1,25 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-rootLogger.level = INFO
-rootLogger.appenderRef.console.ref = ConsoleAppender
-
-appender.console.name = ConsoleAppender
-appender.console.type = CONSOLE
-appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{HH:mm:ss,SSS} %t %-5p %-60c %x - %m%n