-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace key in dynamic endpoint #209
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,13 +32,13 @@ protected SingleRecordSender(final HttpSender httpSender) { | |
public void send(final Collection<SinkRecord> records) { | ||
for (final SinkRecord record : records) { | ||
final String body = recordValueConverter.convert(record); | ||
httpSender.send(body); | ||
httpSender.send(body, record.key().toString()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider having a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, what happens when the key is null? Should it use an empty string? fail? use a fallback value? |
||
} | ||
} | ||
|
||
@Override | ||
public void send(final SinkRecord record) { | ||
final String body = recordValueConverter.convert(record); | ||
httpSender.send(body); | ||
httpSender.send(body, record.key().toString()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,26 @@ protected AbstractHttpSender( | |
this.httpClient = Objects.requireNonNull(httpClient); | ||
} | ||
|
||
public final HttpResponse<String> send(final String body, final String key) { | ||
if (this.config.httpUri().toString().contains("{key}")) { | ||
final var configOriginalMap = this.config.originalsStrings(); | ||
final String genericUrl = configOriginalMap.get("http.url"); | ||
final String newUrl = genericUrl.replace("{key}", key); | ||
configOriginalMap.replace("http.url", newUrl); | ||
log.debug("Key replaced in URL. New URL: {}", newUrl); | ||
|
||
final var newConfig = new HttpSinkConfig(configOriginalMap); | ||
log.debug("Sending request with body: {}", body); | ||
Comment on lines
+52
to
+59
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of modifying the configs on every execution, I'd prefer a more structured approach to flag this.
|
||
|
||
final var requestBuilder = | ||
httpRequestBuilder.build(newConfig).POST(HttpRequest.BodyPublishers.ofString(body)); | ||
return sendWithRetries(requestBuilder, HttpResponseHandler.ON_HTTP_ERROR_RESPONSE_HANDLER, | ||
config.maxRetries()); | ||
} else { | ||
return send(body); | ||
} | ||
} | ||
|
||
public final HttpResponse<String> send(final String body) { | ||
final var requestBuilder = | ||
httpRequestBuilder.build(config).POST(HttpRequest.BodyPublishers.ofString(body)); | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -20,4 +20,6 @@ | |||||
|
||||||
public interface HttpSender { | ||||||
HttpResponse<String> send(final String body); | ||||||
|
||||||
HttpResponse<String> send(final String body, String key); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider passing the key as a function instead of a value:
Suggested change
|
||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sender needs to know if the URL is configured to inject a key or not. Otherwise, it will wastefully transform a key in to a string, and eventually not use it.
Consider adding a flag that is passed from the configuration to have this condition.