diff --git a/src/main/java/io/aiven/kafka/connect/http/recordsender/SingleRecordSender.java b/src/main/java/io/aiven/kafka/connect/http/recordsender/SingleRecordSender.java index 735b6374..2eb08367 100644 --- a/src/main/java/io/aiven/kafka/connect/http/recordsender/SingleRecordSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/recordsender/SingleRecordSender.java @@ -32,13 +32,13 @@ protected SingleRecordSender(final HttpSender httpSender) { public void send(final Collection records) { for (final SinkRecord record : records) { final String body = recordValueConverter.convert(record); - httpSender.send(body); + httpSender.send(body, record.key().toString()); } } @Override public void send(final SinkRecord record) { final String body = recordValueConverter.convert(record); - httpSender.send(body); + httpSender.send(body, record.key().toString()); } } diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java index 0fa5f520..15535311 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java @@ -47,6 +47,26 @@ protected AbstractHttpSender( this.httpClient = Objects.requireNonNull(httpClient); } + public final HttpResponse send(final String body, final String key) { + if (this.config.httpUri().toString().contains("{key}")) { + final var configOriginalMap = this.config.originalsStrings(); + final String genericUrl = configOriginalMap.get("http.url"); + final String newUrl = genericUrl.replace("{key}", key); + configOriginalMap.replace("http.url", newUrl); + log.debug("Key replaced in URL. New URL: {}", newUrl); + + final var newConfig = new HttpSinkConfig(configOriginalMap); + log.debug("Sending request with body: {}", body); + + final var requestBuilder = + httpRequestBuilder.build(newConfig).POST(HttpRequest.BodyPublishers.ofString(body)); + return sendWithRetries(requestBuilder, HttpResponseHandler.ON_HTTP_ERROR_RESPONSE_HANDLER, + config.maxRetries()); + } else { + return send(body); + } + } + public final HttpResponse send(final String body) { final var requestBuilder = httpRequestBuilder.build(config).POST(HttpRequest.BodyPublishers.ofString(body)); diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/HttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/HttpSender.java index 97a7adc6..ce934a3a 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/HttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/HttpSender.java @@ -20,4 +20,6 @@ public interface HttpSender { HttpResponse send(final String body); + + HttpResponse send(final String body, String key); }