request = webClient.requestAbs(vertxMethod, uri.toString());
+ request.putHeader("User-Agent", userAgent);
+
+ return new RequestWrapper(uri, request, vertxMethod);
}
+ /**
+ * Wrapper for Vert.x HttpRequest.
+ *
+ * Note: This class is stateful and wraps a mutable {@link HttpRequest}.
+ * It is designed to be used for a single request configuration and execution.
+ * Reusing an instance of this class for multiple {@code send()} calls may result in
+ * accumulated headers or query parameters.
+ */
@Getter
@EqualsAndHashCode
private class RequestWrapper implements IRequest {
private final URI uri;
- private final Request request;
+ private final HttpRequest request;
+ private final io.vertx.core.http.HttpMethod method;
private int maxLength = 8 * 1024 * 1024;
private String requestBody;
+ private String requestEncoding;
+ private long currentTimeout = 60000; // Default timeout fallback
+ private final Map> queryParamsMap = new HashMap<>();
- RequestWrapper(URI uri, Request request) {
+ RequestWrapper(URI uri, HttpRequest request, io.vertx.core.http.HttpMethod method) {
this.uri = uri;
this.request = request;
+ this.method = method;
+
+ // Parse initial query params from URI if any
+ String query = uri.getQuery();
+ if (query != null && !query.isEmpty()) {
+ String[] pairs = query.split(KEY_LOGICAL_AND);
+ for (String pair : pairs) {
+ int idx = pair.indexOf(KEY_EQUALS);
+ String key = idx > 0 ? pair.substring(0, idx) : pair;
+ String value = idx > 0 && pair.length() > idx + 1 ? pair.substring(idx + 1) : null;
+
+ if (key != null) key = URLDecoder.decode(key, StandardCharsets.UTF_8);
+ if (value != null) value = URLDecoder.decode(value, StandardCharsets.UTF_8);
+
+ queryParamsMap.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
+ }
+ }
}
@Override
public IRequest setBasicAuthentication(String username, String password, String realm, boolean preemptive) {
- if (preemptive) {
- request.headers(httpFields -> httpFields.add("Authorization", "Basic " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes())));
- } else {
- AuthenticationStore auth = httpClient.getAuthenticationStore();
- auth.addAuthentication(new BasicAuthentication(uri, realm, username, password));
- }
-
- return this;
+ // Vert.x basic auth helper
+ // Note: 'realm' and 'preemptive' parameters are ignored by Vert.x WebClient's basicAuthentication helper.
+ // It automatically sets the Authorization header (equivalent to preemptive=true).
+ // WARNING: Non-preemptive authentication (preemptive=false) is NOT supported in this implementation.
+ request.basicAuthentication(username, password);
+ return this;
}
public IRequest setHttpHeader(String headerName, String value) {
- request.headers(httpFields -> httpFields.add(headerName, value));
+ request.putHeader(headerName, value);
return this;
}
@Override
public IRequest setQueryParam(String key, String value) {
- request.param(key, value);
+ request.addQueryParam(key, value);
+ queryParamsMap.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
return this;
}
@Override
public IRequest setUserAgent(String userAgent) {
- request.agent(userAgent);
+ request.putHeader("User-Agent", userAgent);
return this;
}
@Override
public IRequest setBodyEntity(String content, String encoding, String contentType) {
this.requestBody = content;
- request.body(new StringRequestContent(contentType, this.requestBody, Charset.forName(encoding)));
+ this.requestEncoding = encoding;
+
+ if (contentType != null) {
+ request.putHeader("Content-Type", contentType);
+ }
return this;
}
@Override
public IRequest setMaxResponseSize(int maxLength) {
this.maxLength = maxLength;
-
+ // Note: Vert.x WebClient buffers the entire response by default.
+ // Size limits are validated in handleResponse() after the response is received.
return this;
}
@Override
public IRequest setTimeout(long timeout, TimeUnit timeUnit) {
- request.timeout(timeout, timeUnit);
-
+ long timeoutMillis = timeUnit.toMillis(timeout);
+ this.currentTimeout = timeoutMillis;
+ request.timeout(timeoutMillis);
return this;
}
@Override
public IResponse send() throws HttpRequestException {
+ CompletableFuture future = new CompletableFuture<>();
+
+ doSend(ar -> {
+ if (ar.succeeded()) {
+ future.complete(ar.result());
+ } else {
+ future.completeExceptionally(ar.cause());
+ }
+ });
+
try {
- var listener = new CompletableResponseListener(request, maxLength);
- CompletableFuture completableFuture = listener.send();
- completableFuture.thenApply(ContentResponse::getContentAsString);
- var response = completableFuture.get();
- var responseWrapper = new ResponseWrapper();
- responseWrapper.setContentAsString(response.getContentAsString());
- responseWrapper.setHttpCode(response.getStatus());
- responseWrapper.setHttpCodeMessage(response.getReason());
- responseWrapper.setHttpHeader(convertHeaderToMap(response.getHeaders()));
-
- return responseWrapper;
+ // Use a timeout slightly larger than the request timeout to ensure we don't block indefinitely
+ // if the callback never fires (though Vert.x should handle the timeout).
+ return future.get(currentTimeout + 1000, TimeUnit.MILLISECONDS);
+ } catch (java.util.concurrent.TimeoutException e) {
+ throw new HttpRequestException("Request timed out while waiting for response", e);
} catch (InterruptedException | ExecutionException e) {
- throw new HttpRequestException(e.getLocalizedMessage(), e);
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ Throwable cause = (e instanceof ExecutionException && e.getCause() != null) ? e.getCause() : e;
+ throw new HttpRequestException(cause.getLocalizedMessage(), cause);
}
}
+ private void doSend(io.vertx.core.Handler> handler) {
+ // Buffer entire response in memory; check size limits in handleResponse to mitigate large responses.
+ if (requestBody != null) {
+ Buffer buffer;
+ try {
+ buffer = requestEncoding != null ? Buffer.buffer(requestBody, requestEncoding) : Buffer.buffer(requestBody);
+ } catch (IllegalArgumentException e) {
+ handler.handle(io.vertx.core.Future.failedFuture(new HttpRequestException("Invalid encoding: " + requestEncoding, e)));
+ return;
+ }
+ request.sendBuffer(buffer, ar -> handleResponse(ar, handler));
+ } else {
+ request.send(ar -> handleResponse(ar, handler));
+ }
+ }
+
+ private void handleResponse(io.vertx.core.AsyncResult> ar, io.vertx.core.Handler> handler) {
+ if (ar.succeeded()) {
+ HttpResponse response = ar.result();
+ // Check Content-Length header if available
+ String contentLengthHeader = response.getHeader("Content-Length");
+ if (contentLengthHeader != null) {
+ try {
+ long contentLength = Long.parseLong(contentLengthHeader);
+ if (contentLength > maxLength) {
+ String message = String.format("Response Content-Length %d exceeds maximum allowed length %d", contentLength, maxLength);
+ log.warn(message);
+ handler.handle(io.vertx.core.Future.failedFuture(new IResponse.HttpResponseException(message)));
+ return;
+ }
+ } catch (NumberFormatException e) {
+ // Ignore invalid content-length
+ }
+ }
+
+ Buffer body = response.body();
+ if (body != null && body.length() > maxLength) {
+ String message = String.format("Response body length %d exceeds maximum allowed length %d", body.length(), maxLength);
+ log.warn(message);
+ handler.handle(io.vertx.core.Future.failedFuture(new IResponse.HttpResponseException(message)));
+ return;
+ }
+
+ ResponseWrapper responseWrapper = new ResponseWrapper();
+ if (body != null) {
+ responseWrapper.setContentAsString(response.bodyAsString());
+ } else {
+ responseWrapper.setContentAsString("");
+ }
+
+ responseWrapper.setHttpCode(response.statusCode());
+ responseWrapper.setHttpCodeMessage(response.statusMessage());
+ responseWrapper.setHttpHeader(convertHeaderToMap(response.headers()));
+ handler.handle(io.vertx.core.Future.succeededFuture(responseWrapper));
+ } else {
+ handler.handle(io.vertx.core.Future.failedFuture(ar.cause()));
+ }
+ }
+
@Override
public Map toMap() {
Map map = new HashMap<>();
// Add URI and HTTP method
map.put(KEY_URI, uri.toString());
- map.put(KEY_METHOD, request.getMethod());
+ map.put(KEY_METHOD, method.name());
// Add headers
Map headers = new HashMap<>();
- request.getHeaders().forEach(field -> headers.put(field.getName(), field.getValue()));
+ request.headers().forEach(entry -> headers.put(entry.getKey(), entry.getValue()));
map.put(KEY_HEADERS, headers);
- // Add query parameters by parsing the URI
- Map> queryParams = new HashMap<>();
- String query = uri.getQuery();
- if (query != null && !query.isEmpty()) {
- String[] pairs = query.split(KEY_LOGICAL_AND);
- for (String pair : pairs) {
- int idx = pair.indexOf(KEY_EQUALS);
- String key = idx > 0 ? pair.substring(0, idx) : pair;
- String value = idx > 0 && pair.length() > idx + 1 ? pair.substring(idx + 1) : null;
- queryParams.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
- }
- }
- map.put(KEY_QUERY_PARAMS, queryParams);
+ // Add query parameters
+ map.put(KEY_QUERY_PARAMS, queryParamsMap);
// Add body if present
if (requestBody != null) {
map.put(KEY_BODY, requestBody);
}
- // Add user agent if present
- String userAgent = request.getAgent();
+ // Add user agent
+ String userAgent = request.headers().get("User-Agent");
if (userAgent != null) {
map.put(KEY_USER_AGENT, userAgent);
}
@@ -194,35 +284,35 @@ public Map toMap() {
@Override
public void send(final ICompleteListener completeListener) {
- final BufferingResponseListener responseListener = new BufferingResponseListener(maxLength) {
- @Override
- public void onComplete(final Result result) {
- final Response response = result.getResponse();
- final String content = getContentAsString();
-
+ doSend(ar -> {
+ if (ar.succeeded()) {
try {
- var responseWrapper = new ResponseWrapper();
- responseWrapper.setContentAsString(content);
- responseWrapper.setHttpCode(response.getStatus());
- responseWrapper.setHttpCodeMessage(response.getReason());
- responseWrapper.setHttpHeader(convertHeaderToMap(response.getHeaders()));
-
- completeListener.onComplete(responseWrapper);
+ completeListener.onComplete(ar.result());
} catch (IResponse.HttpResponseException e) {
log.error(e.getLocalizedMessage(), e);
}
+ } else {
+ log.error(ar.cause().getLocalizedMessage(), ar.cause());
+ // Attempt to notify listener of failure via a 503 response.
+ // Strictly speaking ICompleteListener expects a response.
+ ResponseWrapper errorResponse = new ResponseWrapper();
+ errorResponse.setHttpCode(503);
+ errorResponse.setHttpCodeMessage("Service Unavailable: " + ar.cause().getLocalizedMessage());
+ try {
+ completeListener.onComplete(errorResponse);
+ } catch (IResponse.HttpResponseException e) {
+ log.error("Error while calling onComplete with error response", e);
+ }
}
- };
-
- request.send(responseListener);
+ });
}
@Override
public String toString() {
- String requestBody = truncateAndClean(this.requestBody);
+ String requestBodyTruncated = truncateAndClean(this.requestBody);
- return String.format("RequestWrapper{uri=%s, request=%s, requestBody=\"%s\", maxLength=%d, queryParams=%s}",
- uri, request, requestBody, maxLength, request.getParams());
+ return String.format("RequestWrapper{uri=%s, method=%s, requestBody=\"%s\", maxLength=%d, queryParams=%s}",
+ uri, method, requestBodyTruncated, maxLength, queryParamsMap);
}
}
@@ -237,12 +327,12 @@ private static class ResponseWrapper implements IResponse {
@Override
public String toString() {
- String contentAsString = truncateAndClean(this.contentAsString);
+ String contentAsStringTruncated = truncateAndClean(this.contentAsString);
String httpHeaderString = httpHeader != null ? httpHeader.toString() : null;
return String.format("ResponseWrapper{httpCode=%d, httpCodeMessage=\"%s\", responseBody=\"%s\", httpHeader=%s}",
- httpCode, httpCodeMessage, contentAsString, httpHeaderString);
+ httpCode, httpCodeMessage, contentAsStringTruncated, httpHeaderString);
}
}
@@ -259,10 +349,10 @@ private static String truncateAndClean(String text) {
return text;
}
- private static Map convertHeaderToMap(HttpFields headers) {
+ private static Map convertHeaderToMap(MultiMap headers) {
Map httpHeader = new HashMap<>();
- for (HttpField header : headers) {
- httpHeader.put(header.getName(), header.getValue());
+ for (Map.Entry header : headers) {
+ httpHeader.put(header.getKey(), header.getValue());
}
return httpHeader;
}
diff --git a/src/main/java/ai/labs/eddi/engine/httpclient/impl/JettyHttpClient.java b/src/main/java/ai/labs/eddi/engine/httpclient/impl/JettyHttpClient.java
deleted file mode 100644
index 1a43e97a3..000000000
--- a/src/main/java/ai/labs/eddi/engine/httpclient/impl/JettyHttpClient.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package ai.labs.eddi.engine.httpclient.impl;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-import org.eclipse.jetty.client.HttpClient;
-
-@NoArgsConstructor
-@AllArgsConstructor
-@Getter
-@Setter
-public class JettyHttpClient {
- private HttpClient httpClient;
-}
diff --git a/src/main/java/ai/labs/eddi/engine/httpclient/impl/VertxHttpClient.java b/src/main/java/ai/labs/eddi/engine/httpclient/impl/VertxHttpClient.java
new file mode 100644
index 000000000..b3954db93
--- /dev/null
+++ b/src/main/java/ai/labs/eddi/engine/httpclient/impl/VertxHttpClient.java
@@ -0,0 +1,19 @@
+package ai.labs.eddi.engine.httpclient.impl;
+
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientSession;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@NoArgsConstructor
+@AllArgsConstructor
+@Getter
+@Setter
+public class VertxHttpClient {
+ private Vertx vertx;
+ private WebClientSession webClient;
+ private WebClient underlyingClient;
+}