diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
index 695caa2f4672..41ad7c6040b2 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
@@ -38,6 +38,11 @@
nifi-framework-components
2.0.0-SNAPSHOT
+
+ org.apache.nifi
+ nifi-web-client-api
+ 2.0.0-SNAPSHOT
+
org.apache.nifi
nifi-web-client
@@ -95,11 +100,6 @@
2.0.0-SNAPSHOT
-
- org.jetbrains
- annotations
- 24.1.0
-
org.apache.nifi
c2-protocol-component-api
@@ -207,10 +207,6 @@
jakarta.xml.bind
jakarta.xml.bind-api
-
- com.squareup.okhttp3
- okhttp
-
@@ -251,6 +247,18 @@
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+
+ true
+
+
+
+
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
index fb7d2ba0bad7..6da275481a36 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
@@ -95,7 +95,6 @@
import org.apache.nifi.cluster.coordination.http.endpoints.UserGroupsEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.UsersEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.VerifyConfigEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.util.FormatUtils;
@@ -104,6 +103,7 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.HttpURLConnection;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@@ -282,7 +282,7 @@ private void drainResponses(final Set responses, final NodeRespons
responses.stream()
.parallel() // "parallelize" the draining of the responses, since we have multiple streams to consume
.filter(response -> response != exclude) // don't include the explicitly excluded node
- .filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any continue responses because they contain no content
+ .filter(response -> response.getStatus() != HttpURLConnection.HTTP_ACCEPTED) // don't include any continue responses because they contain no content
.forEach(this::drainResponse); // drain all node responses that didn't get filtered out
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java
index 0571e66e3cc0..1b717361751d 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/HttpReplicationClient.java
@@ -18,6 +18,7 @@
package org.apache.nifi.cluster.coordination.http.replication;
import java.io.IOException;
+import java.net.URI;
import java.util.Map;
import jakarta.ws.rs.core.Response;
@@ -26,6 +27,6 @@ public interface HttpReplicationClient {
PreparedRequest prepareRequest(String method, Map headers, Object entity);
- Response replicate(PreparedRequest request, String uri) throws IOException;
+ Response replicate(PreparedRequest request, URI uri) throws IOException;
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicationHeader.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicationHeader.java
new file mode 100644
index 000000000000..3da60e5c0692
--- /dev/null
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicationHeader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http.replication;
+
+/**
+ * Enumeration of HTTP headers for Request Replication with lowercasing for compatibility with HTTP/2
+ */
+public enum RequestReplicationHeader {
+ /**
+ * Indicator to cancel transaction processing
+ */
+ CANCEL_TRANSACTION("cancel-transaction"),
+
+ /**
+ * Seed for deterministic cluster identifier generation
+ */
+ CLUSTER_ID_GENERATION_SEED("cluster-id-generation-seed"),
+
+ /**
+ * Indicator to continue transaction processing
+ */
+ EXECUTION_CONTINUE("execution-continue"),
+
+ /**
+ * When replicating a request to the cluster coordinator, it may be useful to denote that the request should
+ * be replicated only to a single node. This happens, for instance, when retrieving a Provenance Event that
+ * we know lives on a specific node. This request must still be replicated through the cluster coordinator.
+ * This header tells the cluster coordinator the UUID's (comma-separated list, possibly with spaces between)
+ * of the nodes that the request should be replicated to.
+ */
+ REPLICATION_TARGET_ID("replication-target-id"),
+
+ /**
+ * When we replicate a request across the cluster, we replicate it only from the cluster coordinator.
+ * If the request needs to be replicated by another node, it first replicates the request to the coordinator,
+ * which then replicates the request on the node's behalf. This header name and value are used to denote
+ * that the request has already been to the cluster coordinator, and the cluster coordinator is the one replicating
+ * the request. This allows us to know that the request should be serviced, rather than proxied back to the
+ * cluster coordinator.
+ */
+ REQUEST_REPLICATED("request-replicated"),
+
+ /**
+ * Transaction Identifier for replicated requests
+ */
+ REQUEST_TRANSACTION_ID("request-transaction-id"),
+
+ /**
+ * The HTTP header that the requestor specifies to ask a node if they are able to process a given request.
+ * The value is always 202-Accepted. The node will respond with 202 ACCEPTED if it is able to
+ * process the request, 417 EXPECTATION_FAILED otherwise.
+ */
+ VALIDATION_EXPECTS("validation-expects");
+
+ private final String header;
+
+ RequestReplicationHeader(final String header) {
+ this.header = header;
+ }
+
+ public String getHeader() {
+ return header;
+ }
+}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
index 1965ad98b645..c6d8bb82a9fe 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
@@ -26,48 +26,6 @@
import java.util.Set;
public interface RequestReplicator {
-
- public static final String REQUEST_TRANSACTION_ID_HEADER = "X-RequestTransactionId";
- public static final String CLUSTER_ID_GENERATION_SEED_HEADER = "X-Cluster-Id-Generation-Seed";
-
- /**
- * The HTTP header that the requestor specifies to ask a node if they are able to process a given request.
- * The value is always 202-Accepted. The node will respond with 202 ACCEPTED if it is able to
- * process the request, 417 EXPECTATION_FAILED otherwise.
- */
- public static final String REQUEST_VALIDATION_HTTP_HEADER = "X-Validation-Expects";
- public static final String NODE_CONTINUE = "202-Accepted";
- public static final int NODE_CONTINUE_STATUS_CODE = 202;
-
- /**
- * Indicates that the request is intended to cancel a transaction that was previously created without performing the action
- */
- public static final String REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER = "X-Cancel-Transaction";
-
- /**
- * Indicates that this is the second phase of the two phase commit and the execution of the action should proceed.
- */
- public static final String REQUEST_EXECUTION_HTTP_HEADER = "X-Execution-Continue";
-
- /**
- * When we replicate a request across the cluster, we replicate it only from the cluster coordinator.
- * If the request needs to be replicated by another node, it first replicates the request to the coordinator,
- * which then replicates the request on the node's behalf. This header name and value are used to denote
- * that the request has already been to the cluster coordinator, and the cluster coordinator is the one replicating
- * the request. This allows us to know that the request should be serviced, rather than proxied back to the
- * cluster coordinator.
- */
- public static final String REPLICATION_INDICATOR_HEADER = "X-Request-Replicated";
-
- /**
- * When replicating a request to the cluster coordinator, it may be useful to denote that the request should
- * be replicated only to a single node. This happens, for instance, when retrieving a Provenance Event that
- * we know lives on a specific node. This request must still be replicated through the cluster coordinator.
- * This header tells the cluster coordinator the UUID's (comma-separated list, possibly with spaces between)
- * of the nodes that the request should be replicated to.
- */
- public static final String REPLICATION_TARGET_NODE_UUID_HEADER = "X-Replication-Target-Id";
-
/**
* Stops the instance from replicating requests. Calling this method on a stopped instance has no effect.
*/
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java
index 25dfaac4651e..73b93acd9baf 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardUploadRequestReplicator.java
@@ -153,8 +153,8 @@ private T replicateRequest(final NodeIdentifier nodeId, final UploadRequest<
.uri(requestUri)
.body(inputStream, OptionalLong.of(inputStream.available()))
// Special NiFi-specific headers to indicate that the request should be performed and not replicated to the nodes
- .header(RequestReplicator.REQUEST_EXECUTION_HTTP_HEADER, "true")
- .header(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true")
+ .header(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader(), Boolean.TRUE.toString())
+ .header(RequestReplicationHeader.REQUEST_REPLICATED.getHeader(), Boolean.TRUE.toString())
.header(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user))
.header(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS, ProxiedEntitiesUtils.buildProxiedEntityGroupsString(user.getIdentityProviderGroups()));
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index ab150aa5319d..ae4344af3232 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -61,6 +61,7 @@
import java.io.Closeable;
import java.io.IOException;
+import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -98,6 +99,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator, Closeable
private static final String COOKIE_HEADER = "Cookie";
private static final String HOST_HEADER = "Host";
+ private static final String NODE_CONTINUE = "202-Accepted";
private final int maxConcurrentRequests; // maximum number of concurrent requests
private final HttpResponseMapper responseMapper;
@@ -276,9 +278,9 @@ public AsyncClusterResponse replicate(Set nodeIds, final NiFiUse
final boolean indicateReplicated, final boolean performVerification) {
final Map updatedHeaders = new HashMap<>(headers);
- updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, ComponentIdGenerator.generateId().toString());
+ updatedHeaders.put(RequestReplicationHeader.CLUSTER_ID_GENERATION_SEED.getHeader(), ComponentIdGenerator.generateId().toString());
if (indicateReplicated) {
- updatedHeaders.put(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true");
+ updatedHeaders.put(RequestReplicationHeader.REQUEST_REPLICATED.getHeader(), Boolean.TRUE.toString());
}
// include the proxied entities header
@@ -380,7 +382,7 @@ AsyncClusterResponse replicate(final Set nodeIds, final String m
// Update headers to indicate the current revision so that we can
// prevent multiple users changing the flow at the same time
final Map updatedHeaders = new HashMap<>(headers);
- final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString());
+ final String requestId = computeRequestId(updatedHeaders);
long verifyClusterStateNanos = -1;
if (performVerification) {
@@ -458,7 +460,7 @@ AsyncClusterResponse replicate(final Set nodeIds, final String m
// instruct the node to actually perform the underlying action
if (mutableRequest && executionPhase) {
- updatedHeaders.put(REQUEST_EXECUTION_HTTP_HEADER, "true");
+ updatedHeaders.put(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader(), "true");
}
// replicate the request to all nodes
@@ -486,13 +488,16 @@ AsyncClusterResponse replicate(final Set nodeIds, final String m
}
}
+ private String computeRequestId(final Map headers) {
+ return headers.computeIfAbsent(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader(), header -> UUID.randomUUID().toString());
+ }
private void performVerification(final Set nodeIds, final String method, final URI uri, final Object entity, final Map headers,
final StandardAsyncClusterResponse clusterResponse, final boolean merge, final Object monitor) {
logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath());
final Map validationHeaders = new HashMap<>(headers);
- validationHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE);
+ validationHeaders.put(RequestReplicationHeader.VALIDATION_EXPECTS.getHeader(), NODE_CONTINUE);
final long startNanos = System.nanoTime();
final int numNodes = nodeIds.size();
@@ -523,7 +528,7 @@ public void onCompletion(final NodeResponse nodeResponse) {
clusterResponse.addTiming("Verification Completed", "All Nodes", nanos);
// Check if we have any requests that do not have a 202-Accepted status code.
- final long dissentingCount = nodeResponses.stream().filter(p -> p.getStatus() != NODE_CONTINUE_STATUS_CODE).count();
+ final long dissentingCount = nodeResponses.stream().filter(p -> p.getStatus() != HttpURLConnection.HTTP_ACCEPTED).count();
// If all nodes responded with 202-Accepted, then we can replicate the original request
// to all nodes and we are finished.
@@ -535,7 +540,7 @@ public void onCompletion(final NodeResponse nodeResponse) {
try {
final Map cancelLockHeaders = new HashMap<>(headers);
- cancelLockHeaders.put(REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER, "true");
+ cancelLockHeaders.put(RequestReplicationHeader.CANCEL_TRANSACTION.getHeader(), "true");
final Thread cancelLockThread = new Thread(new Runnable() {
@Override
public void run() {
@@ -554,7 +559,7 @@ public void run() {
// Add a NodeResponse for each node to the Cluster Response
// Check that all nodes responded successfully.
for (final NodeResponse response : nodeResponses) {
- if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) {
+ if (response.getStatus() != HttpURLConnection.HTTP_ACCEPTED) {
final Response clientResponse = response.getClientResponse();
final String message;
@@ -645,7 +650,7 @@ protected NodeResponse replicateRequest(final PreparedRequest request, final Nod
logger.debug("Replicating request to {} {}, request ID = {}, headers = {}", request.getMethod(), uri, requestId, request.getHeaders());
// invoke the request
- response = httpClient.replicate(request, uri.toString());
+ response = httpClient.replicate(request, uri);
final long nanos = System.nanoTime() - startNanos;
clusterResponse.addTiming("Perform HTTP Request", nodeId.toString(), nanos);
@@ -866,7 +871,7 @@ public void run() {
try {
// create and send the request
- final String requestId = request.getHeaders().get("x-nifi-request-id");
+ final String requestId = request.getHeaders().get(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
logger.debug("Replicating request {} {} to {}", method, uri.getPath(), nodeId);
nodeResponse = replicateRequest(request, nodeId, uri, requestId, clusterResponse);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/PreparedRequestHeader.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/PreparedRequestHeader.java
new file mode 100644
index 000000000000..09b175b43767
--- /dev/null
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/PreparedRequestHeader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http.replication.client;
+
+/**
+ * Enumeration of HTTP headers for preparing replicated requests
+ */
+enum PreparedRequestHeader {
+ ACCEPT_ENCODING("accept-encoding"),
+
+ CONTENT_ENCODING("content-encoding"),
+
+ CONTENT_LENGTH("content-length"),
+
+ CONTENT_TYPE("content-type"),
+
+ USER_AGENT("user-agent");
+
+ private final String header;
+
+ PreparedRequestHeader(final String header) {
+ this.header = header;
+ }
+
+ String getHeader() {
+ return header;
+ }
+}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/StandardHttpReplicationClient.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/StandardHttpReplicationClient.java
new file mode 100644
index 000000000000..522f235f87a3
--- /dev/null
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/StandardHttpReplicationClient.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http.replication.client;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationIntrospector;
+import jakarta.ws.rs.core.MultivaluedHashMap;
+import jakarta.ws.rs.core.MultivaluedMap;
+import jakarta.ws.rs.core.Response;
+
+import org.apache.nifi.cluster.coordination.http.replication.HttpReplicationClient;
+import org.apache.nifi.cluster.coordination.http.replication.PreparedRequest;
+import org.apache.nifi.cluster.coordination.http.replication.io.EntitySerializer;
+import org.apache.nifi.cluster.coordination.http.replication.io.JacksonResponse;
+import org.apache.nifi.cluster.coordination.http.replication.io.JsonEntitySerializer;
+import org.apache.nifi.cluster.coordination.http.replication.io.XmlEntitySerializer;
+import org.apache.nifi.web.client.api.HttpEntityHeaders;
+import org.apache.nifi.web.client.api.HttpRequestBodySpec;
+import org.apache.nifi.web.client.api.HttpRequestMethod;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.HttpUriBuilder;
+import org.apache.nifi.web.client.api.WebClientService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * Standard HTTP Replication Client based on Web Client Service
+ */
+public class StandardHttpReplicationClient implements HttpReplicationClient {
+ private static final Set REQUEST_BODY_METHODS = Set.of("PATCH", "POST", "PUT");
+
+ private static final Set DISALLOWED_HEADERS = Set.of("connection", "content-length", "expect", "host", "upgrade");
+
+ private static final char PSEUDO_HEADER_PREFIX = ':';
+
+ private static final String GZIP_ENCODING = "gzip";
+
+ private static final String QUERY_SEPARATOR = "&";
+
+ private static final String QUERY_NAME_VALUE_SEPARATOR = "=";
+
+ private static final String APPLICATION_JSON_CONTENT_TYPE = "application/json";
+
+ private static final String APPLICATION_XML_CONTENT_TYPE = "application/xml";
+
+ private static final String USER_AGENT_PRODUCT = "Apache NiFi";
+
+ private static final String USER_AGENT_FORMAT = "%s/%s";
+
+ private static final String USER_AGENT_VERSION = "SNAPSHOT";
+
+ private static final String USER_AGENT;
+
+ private static final Logger logger = LoggerFactory.getLogger(StandardHttpReplicationClient.class);
+
+ static {
+ final Package clientPackage = StandardHttpReplicationClient.class.getPackage();
+ final String userAgentVersion;
+ if (clientPackage == null || clientPackage.getImplementationVersion() == null) {
+ userAgentVersion = USER_AGENT_VERSION;
+ } else {
+ // Set User Agent Version from JAR MANIFEST.MF Version when found
+ userAgentVersion = clientPackage.getImplementationVersion();
+ }
+ USER_AGENT = USER_AGENT_FORMAT.formatted(USER_AGENT_PRODUCT, userAgentVersion);
+ }
+
+ private final WebClientService webClientService;
+
+ private final Supplier httpUriBuilderSupplier;
+
+ private final EntitySerializer jsonSerializer;
+
+ private final EntitySerializer xmlSerializer;
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ public StandardHttpReplicationClient(final WebClientService webClientService, final Supplier httpUriBuilderSupplier) {
+ this.webClientService = Objects.requireNonNull(webClientService, "Web Client Service required");
+ this.httpUriBuilderSupplier = Objects.requireNonNull(httpUriBuilderSupplier, "HTTP URI Builder supplier required");
+
+ objectMapper.setDefaultPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.ALWAYS));
+ objectMapper.setAnnotationIntrospector(new JakartaXmlBindAnnotationIntrospector(objectMapper.getTypeFactory()));
+
+ jsonSerializer = new JsonEntitySerializer(objectMapper);
+ xmlSerializer = new XmlEntitySerializer();
+ }
+
+ /**
+ * Prepare Request for Replication with serialized Request Entity
+ *
+ * @param method HTTP Method
+ * @param headers HTTP Request Headers
+ * @param requestEntity Request Entity to be serialized
+ * @return Prepared Request for replication
+ */
+ @Override
+ public PreparedRequest prepareRequest(final String method, final Map headers, final Object requestEntity) {
+ final Map preparedHeaders = getPreparedHeaders(headers, method);
+ final byte[] requestBody = getRequestBody(requestEntity, preparedHeaders);
+ return new StandardPreparedRequest(method, preparedHeaders, requestEntity, requestBody);
+ }
+
+ /**
+ * Replicate Prepared HTTP Request to destination URI
+ *
+ * @param request Prepared HTTP Request for replication
+ * @param uri Destination URI for sending the request
+ * @return Jakarta REST Response
+ * @throws IOException Thrown on communication failures sending requests or retrieving responses
+ */
+ @Override
+ public Response replicate(final PreparedRequest request, final URI uri) throws IOException {
+ if (request instanceof StandardPreparedRequest preparedRequest) {
+ return replicate(preparedRequest, uri);
+ } else {
+ throw new IllegalArgumentException("HTTP Prepared Request not provided");
+ }
+ }
+
+ private Map getPreparedHeaders(final Map headers, final String method) {
+ final Map preparedHeaders = new LinkedHashMap<>();
+
+ for (final Map.Entry header : headers.entrySet()) {
+ final String headerName = header.getKey().toLowerCase();
+ if (PreparedRequestHeader.ACCEPT_ENCODING.getHeader().equals(headerName)) {
+ // Remove Accept-Encoding from original client request in favor of specific value for replication
+ continue;
+ }
+
+ final String headerValue = header.getValue();
+ preparedHeaders.put(headerName, headerValue);
+ }
+
+ // Set Accept-Encoding to request gzip encoded responses
+ preparedHeaders.put(PreparedRequestHeader.ACCEPT_ENCODING.getHeader(), GZIP_ENCODING);
+
+ processContentType(method, preparedHeaders);
+ processUserAgent(preparedHeaders);
+ return preparedHeaders;
+ }
+
+ private Response replicate(final StandardPreparedRequest preparedRequest, final URI location) throws IOException {
+ final HttpRequestMethod requestMethod = getRequestMethod(preparedRequest);
+ final URI requestUri = getRequestUri(preparedRequest, location);
+
+ final HttpRequestBodySpec httpRequestBodySpec = webClientService.method(requestMethod).uri(requestUri);
+
+ final Map requestHeaders = preparedRequest.headers();
+ for (final Map.Entry requestHeader : requestHeaders.entrySet()) {
+ final String headerName = requestHeader.getKey();
+ final String headerNameLowerCased = headerName.toLowerCase();
+ if (!DISALLOWED_HEADERS.contains(headerNameLowerCased)) {
+ httpRequestBodySpec.header(headerName, requestHeader.getValue());
+ }
+ }
+
+ if (REQUEST_BODY_METHODS.contains(requestMethod.getMethod())) {
+ final byte[] requestBody = preparedRequest.requestBody();
+ final ByteArrayInputStream body = new ByteArrayInputStream(requestBody);
+ final OptionalLong contentLength = OptionalLong.of(requestBody.length);
+ httpRequestBodySpec.body(body, contentLength);
+ }
+
+ return replicate(httpRequestBodySpec, preparedRequest.method(), location);
+ }
+
+ private Response replicate(final HttpRequestBodySpec httpRequestBodySpec, final String method, final URI location) throws IOException {
+ final long started = System.currentTimeMillis();
+
+ try (HttpResponseEntity responseEntity = httpRequestBodySpec.retrieve()) {
+ final int statusCode = responseEntity.statusCode();
+ final HttpEntityHeaders headers = responseEntity.headers();
+ final MultivaluedMap responseHeaders = getResponseHeaders(headers);
+ final byte[] responseBody = getResponseBody(responseEntity.body(), headers);
+
+ final long elapsed = System.currentTimeMillis() - started;
+ logger.debug("Replicated {} {} HTTP {} in {} ms", method, location, statusCode, elapsed);
+
+ return new JacksonResponse(objectMapper, responseBody, responseHeaders, location, statusCode, null);
+ }
+ }
+
+ private URI getRequestUri(final StandardPreparedRequest preparedRequest, final URI location) {
+ final HttpUriBuilder httpUriBuilder = httpUriBuilderSupplier.get();
+
+ httpUriBuilder.scheme(location.getScheme());
+ httpUriBuilder.host(location.getHost());
+ httpUriBuilder.port(location.getPort());
+ httpUriBuilder.encodedPath(location.getPath());
+
+ final String query = location.getQuery();
+ if (query != null) {
+ final String[] parameters = query.split(QUERY_SEPARATOR);
+ for (final String parameter : parameters) {
+ final String[] parameterNameValue = parameter.split(QUERY_NAME_VALUE_SEPARATOR);
+ if (parameterNameValue.length == 1) {
+ final String parameterName = parameterNameValue[0];
+ httpUriBuilder.addQueryParameter(parameterName, null);
+ } else if (parameterNameValue.length == 2) {
+ final String parameterName = parameterNameValue[0];
+ final String parameterValue = parameterNameValue[1];
+ httpUriBuilder.addQueryParameter(parameterName, parameterValue);
+ }
+ }
+ }
+
+ final Object requestEntity = preparedRequest.entity();
+ if (requestEntity instanceof MultivaluedMap, ?> parameterEntity) {
+ for (final Object key : parameterEntity.keySet()) {
+ final String parameterName = key.toString();
+ final Object parameterValues = parameterEntity.get(parameterName);
+ if (parameterValues instanceof List> values) {
+ for (final Object value : values) {
+ httpUriBuilder.addQueryParameter(parameterName, value.toString());
+ }
+ }
+ }
+ }
+
+ return httpUriBuilder.build();
+ }
+
+ private HttpRequestMethod getRequestMethod(final PreparedRequest preparedRequest) {
+ final String method = preparedRequest.getMethod();
+ return new HttpRequestMethod() {
+ @Override
+ public String getMethod() {
+ return method;
+ }
+
+ @Override
+ public String toString() {
+ return method;
+ }
+ };
+ }
+
+ private MultivaluedMap getResponseHeaders(final HttpEntityHeaders responseHeaders) {
+ final MultivaluedMap headers = new MultivaluedHashMap<>();
+ for (final String name : responseHeaders.getHeaderNames()) {
+ // Remove pseudo-headers returned from HTTP/2 responses
+ if (name.charAt(0) == PSEUDO_HEADER_PREFIX) {
+ continue;
+ }
+ // Remove Content-Encoding Response Header to align with gzip decoding of Response Body
+ if (PreparedRequestHeader.CONTENT_ENCODING.getHeader().equalsIgnoreCase(name)) {
+ continue;
+ }
+ // Remove Content-Length Response Header to align with gzip decoding of Response Body
+ if (PreparedRequestHeader.CONTENT_LENGTH.getHeader().equalsIgnoreCase(name)) {
+ continue;
+ }
+ final List values = responseHeaders.getHeader(name);
+ headers.addAll(name, values);
+ }
+ return headers;
+ }
+
+ private byte[] getResponseBody(final InputStream inputStream, final HttpEntityHeaders responseHeaders) throws IOException {
+ final boolean gzipEncoded = isGzipEncoded(responseHeaders);
+
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try (InputStream responseBodyStream = gzipEncoded ? new GZIPInputStream(inputStream) : inputStream) {
+ responseBodyStream.transferTo(outputStream);
+ }
+ return outputStream.toByteArray();
+ }
+
+ private byte[] getRequestBody(final Object requestEntity, final Map headers) {
+ final Optional contentTypeFound = getContentType(headers);
+ final String contentType = contentTypeFound.orElse(APPLICATION_JSON_CONTENT_TYPE);
+ final EntitySerializer serializer = getSerializer(contentType);
+
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ try {
+ serializer.serialize(requestEntity, outputStream);
+ } catch (final IOException e) {
+ throw new UncheckedIOException("Request Entity serialization failed", e);
+ }
+
+ return outputStream.toByteArray();
+ }
+
+ private void processContentType(final String method, final Map headers) {
+ if (REQUEST_BODY_METHODS.contains(method)) {
+ final Optional contentTypeHeaderFound = getHeaderName(headers, PreparedRequestHeader.CONTENT_TYPE);
+ if (contentTypeHeaderFound.isEmpty()) {
+ // Set default Content-Type to JSON
+ headers.put(PreparedRequestHeader.CONTENT_TYPE.getHeader(), APPLICATION_JSON_CONTENT_TYPE);
+ }
+ }
+ }
+
+ private void processUserAgent(final Map headers) {
+ final Optional userAgentHeaderFound = getHeaderName(headers, PreparedRequestHeader.USER_AGENT);
+ final String userAgentHeader = userAgentHeaderFound.orElseGet(PreparedRequestHeader.USER_AGENT::getHeader);
+ headers.put(userAgentHeader, USER_AGENT);
+ }
+
+ private EntitySerializer getSerializer(final String contentType) {
+ final EntitySerializer serializer;
+
+ if (APPLICATION_XML_CONTENT_TYPE.equalsIgnoreCase(contentType)) {
+ serializer = xmlSerializer;
+ } else {
+ serializer = jsonSerializer;
+ }
+
+ return serializer;
+ }
+
+ private boolean isGzipEncoded(final HttpEntityHeaders headers) {
+ final Optional contentEncodingFound = headers.getHeaderNames()
+ .stream()
+ .filter(PreparedRequestHeader.CONTENT_ENCODING.getHeader()::equalsIgnoreCase)
+ .map(headers::getFirstHeader)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .findFirst();
+
+ return contentEncodingFound.map(GZIP_ENCODING::equalsIgnoreCase).orElse(false);
+ }
+
+ private Optional getContentType(final Map headers) {
+ final Optional headerNameFound = getHeaderName(headers, PreparedRequestHeader.CONTENT_TYPE);
+
+ final String header;
+ if (headerNameFound.isPresent()) {
+ final String name = headerNameFound.get();
+ header = headers.get(name);
+ } else {
+ header = null;
+ }
+
+ return Optional.ofNullable(header);
+ }
+
+ private Optional getHeaderName(final Map headers, final PreparedRequestHeader httpHeader) {
+ return headers.keySet()
+ .stream()
+ .filter(httpHeader.getHeader()::equalsIgnoreCase)
+ .findFirst();
+ }
+}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpPreparedRequest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/StandardPreparedRequest.java
similarity index 58%
rename from nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpPreparedRequest.java
rename to nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/StandardPreparedRequest.java
index 9b9a699edc6a..96006cc131a5 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpPreparedRequest.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/client/StandardPreparedRequest.java
@@ -15,26 +15,21 @@
* limitations under the License.
*/
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
-
-import java.util.Map;
+package org.apache.nifi.cluster.coordination.http.replication.client;
import org.apache.nifi.cluster.coordination.http.replication.PreparedRequest;
-import okhttp3.RequestBody;
-
-public class OkHttpPreparedRequest implements PreparedRequest {
- private final String method;
- private final Map headers;
- private final Object entity;
- private final RequestBody requestBody;
+import java.util.Map;
- public OkHttpPreparedRequest(final String method, final Map headers, final Object entity, final RequestBody requestBody) {
- this.method = method;
- this.headers = headers;
- this.entity = entity;
- this.requestBody = requestBody;
- }
+/**
+ * Standard record implementation of Request prepared for Replication
+ *
+ * @param method HTTP Method
+ * @param headers Map of HTTP Request Headers
+ * @param entity HTTP Request Entity
+ * @param requestBody Serialized Request Body
+ */
+record StandardPreparedRequest(String method, Map headers, Object entity, byte[] requestBody) implements PreparedRequest {
@Override
public String getMethod() {
@@ -50,13 +45,4 @@ public Map getHeaders() {
public Object getEntity() {
return entity;
}
-
- public RequestBody getRequestBody() {
- return requestBody;
- }
-
- @Override
- public String toString() {
- return "OkHttpPreparedRequest[method=" + method + ", headers=" + headers + "]";
- }
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/EntitySerializer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/EntitySerializer.java
similarity index 93%
rename from nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/EntitySerializer.java
rename to nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/EntitySerializer.java
index 19028e393c43..703503aa26b6 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/EntitySerializer.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/EntitySerializer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
+package org.apache.nifi.cluster.coordination.http.replication.io;
import java.io.IOException;
import java.io.OutputStream;
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JacksonResponse.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/JacksonResponse.java
similarity index 98%
rename from nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JacksonResponse.java
rename to nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/JacksonResponse.java
index be7b1eebb4d3..7a65846bfcb5 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JacksonResponse.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/JacksonResponse.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
+package org.apache.nifi.cluster.coordination.http.replication.io;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JsonEntitySerializer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/JsonEntitySerializer.java
similarity index 95%
rename from nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JsonEntitySerializer.java
rename to nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/JsonEntitySerializer.java
index d65229143020..8884bc07fa07 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/JsonEntitySerializer.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/JsonEntitySerializer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
+package org.apache.nifi.cluster.coordination.http.replication.io;
import java.io.IOException;
import java.io.OutputStream;
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/XmlEntitySerializer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/XmlEntitySerializer.java
similarity index 97%
rename from nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/XmlEntitySerializer.java
rename to nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/XmlEntitySerializer.java
index e8a005cc4a6a..122131a95f5a 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/XmlEntitySerializer.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/io/XmlEntitySerializer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
+package org.apache.nifi.cluster.coordination.http.replication.io;
import java.io.IOException;
import java.io.OutputStream;
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/CallEventListener.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/CallEventListener.java
deleted file mode 100644
index 2ca859614738..000000000000
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/CallEventListener.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
-
-import okhttp3.Call;
-
-import java.net.SocketAddress;
-import java.text.NumberFormat;
-import java.util.HashMap;
-import java.util.Map;
-
-public class CallEventListener {
- private final Call call;
- private final Map dnsTimings = new HashMap<>();
- private final Map establishConnectionTiming = new HashMap<>();
- private long callStart;
- private long callEnd;
- private long responseBodyStart;
- private long responseBodyEnd;
- private long responseHeaderStart;
- private long responseHeaderEnd;
- private long requestHeaderStart;
- private long requestHeaderEnd;
- private long requestBodyStart;
- private long requestBodyEnd;
- private long secureConnectStart;
- private long secureConnectEnd;
-
-
- public CallEventListener(final Call call) {
- this.call = call;
- }
-
- public void callStart() {
- callStart = System.nanoTime();
- }
-
- public void callEnd() {
- callEnd = System.nanoTime();
- }
-
- public void dnsStart(final String domainName) {
- dnsTimings.computeIfAbsent(domainName, k -> new Timing(domainName)).start();
- }
-
- public void dnsEnd(final String domainName) {
- dnsTimings.computeIfAbsent(domainName, k -> new Timing(domainName)).end();
- }
-
- public void responseBodyStart() {
- responseBodyStart = System.nanoTime();
- }
-
- public void responseBodyEnd() {
- responseBodyEnd = System.nanoTime();
- }
-
- public void responseHeaderStart() {
- responseHeaderStart = System.nanoTime();
- }
-
- public void responseHeaderEnd() {
- responseHeaderEnd = System.nanoTime();
- }
-
- public void requestHeaderStart() {
- requestHeaderStart = System.nanoTime();
- }
-
- public void requestHeaderEnd() {
- requestHeaderEnd = System.nanoTime();
- }
-
- public void requestBodyStart() {
- requestBodyStart = System.nanoTime();
- }
-
- public void requestBodyEnd() {
- requestBodyEnd = System.nanoTime();
- }
-
- public void connectStart(final SocketAddress address) {
- establishConnectionTiming.computeIfAbsent(address.toString(), Timing::new).start();
- }
-
- public void connectionAcquired(final SocketAddress address) {
- establishConnectionTiming.computeIfAbsent(address.toString(), Timing::new).end();
- }
-
- public void secureConnectStart() {
- secureConnectStart = System.nanoTime();
- }
-
- public void secureConnectEnd() {
- secureConnectEnd = System.nanoTime();
- }
-
- public Call getCall() {
- return call;
- }
-
- @Override
- public String toString() {
- final NumberFormat numberFormat = NumberFormat.getInstance();
-
- return "CallEventListener{" +
- "url=" + call.request().url() +
- ", dnsTimings=" + dnsTimings.values() +
- ", establishConnectionTiming=" + establishConnectionTiming.values() +
- ", tlsInitialization=" + numberFormat.format(secureConnectEnd - secureConnectStart) + " nanos" +
- ", writeRequestHeaders=" + numberFormat.format(requestHeaderEnd - requestHeaderStart) + " nanos" +
- ", writeRequestBody=" + numberFormat.format(requestBodyEnd - requestBodyStart) + " nanos" +
- ", readResponseHeaders=" + numberFormat.format(responseHeaderEnd - responseHeaderStart) + " nanos" +
- ", readResponseBody=" + numberFormat.format(responseBodyEnd - responseBodyStart) + " nanos" +
- ", callTime=" + numberFormat.format(callEnd - callStart) + " nanos" +
- '}';
- }
-
- private static class Timing {
- private final String address;
- private long start;
- private long nanos;
-
- public Timing(final String address) {
- this.address = address;
- }
-
- public String getAddress() {
- return address;
- }
-
- public void start() {
- start = System.nanoTime();
- }
-
- public void end() {
- if (start > 0) {
- nanos += (System.nanoTime() - start);
- }
- }
-
- public String toString() {
- return "{address=" + address + ", nanos=" + NumberFormat.getInstance().format(nanos) + "}";
- }
- }
-}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
deleted file mode 100644
index 91b7532b9d24..000000000000
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/OkHttpReplicationClient.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
-
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonInclude.Value;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationIntrospector;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import java.util.zip.GZIPInputStream;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocketFactory;
-import javax.net.ssl.X509TrustManager;
-import jakarta.ws.rs.HttpMethod;
-import jakarta.ws.rs.core.MultivaluedHashMap;
-import jakarta.ws.rs.core.MultivaluedMap;
-import jakarta.ws.rs.core.Response;
-import okhttp3.Call;
-import okhttp3.ConnectionPool;
-import okhttp3.Headers;
-import okhttp3.HttpUrl;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.cluster.coordination.http.replication.HttpReplicationClient;
-import org.apache.nifi.cluster.coordination.http.replication.PreparedRequest;
-import org.apache.nifi.remote.protocol.http.HttpHeaders;
-import org.apache.nifi.stream.io.GZIPOutputStream;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.StreamUtils;
-
-public class OkHttpReplicationClient implements HttpReplicationClient {
- private static final Logger logger = LoggerFactory.getLogger(OkHttpReplicationClient.class);
- private static final Set gzipEncodings = Stream.of("gzip", "x-gzip").collect(Collectors.toSet());
-
- private final EntitySerializer jsonSerializer;
- private final EntitySerializer xmlSerializer;
-
- private final ObjectMapper jsonCodec = new ObjectMapper();
- private final OkHttpClient okHttpClient;
- private final SSLContext sslContext;
- private final X509TrustManager trustManager;
-
- public OkHttpReplicationClient(
- final NiFiProperties properties,
- final SSLContext sslContext,
- final X509TrustManager trustManager
- ) {
- jsonCodec.setDefaultPropertyInclusion(Value.construct(Include.NON_NULL, Include.ALWAYS));
- jsonCodec.setAnnotationIntrospector(new JakartaXmlBindAnnotationIntrospector(jsonCodec.getTypeFactory()));
-
- jsonSerializer = new JsonEntitySerializer(jsonCodec);
- xmlSerializer = new XmlEntitySerializer();
-
- this.sslContext = sslContext;
- this.trustManager = trustManager;
- okHttpClient = createOkHttpClient(properties);
- }
-
- @Override
- public PreparedRequest prepareRequest(final String method, final Map headers, final Object entity) {
- final boolean gzip = isUseGzip(headers);
- checkContentLengthHeader(method, headers);
- final RequestBody requestBody = createRequestBody(headers, entity, gzip);
-
- final Map updatedHeaders = gzip ? updateHeadersForGzip(headers) : headers;
- return new OkHttpPreparedRequest(method, updatedHeaders, entity, requestBody);
- }
-
- /**
- * Checks the content length header on DELETE requests to ensure it is set to '0', avoiding request timeouts on replicated requests.
- *
- * @param method the HTTP method of the request
- * @param headers the header keys and values
- */
- private void checkContentLengthHeader(String method, Map headers) {
- // Only applies to DELETE requests
- if (HttpMethod.DELETE.equalsIgnoreCase(method)) {
- // Find the Content-Length header if present
- final String CONTENT_LENGTH_HEADER_KEY = "Content-Length";
- Map.Entry contentLengthEntry = headers.entrySet().stream().filter(entry -> entry.getKey().equalsIgnoreCase(CONTENT_LENGTH_HEADER_KEY)).findFirst().orElse(null);
- // If no CL header, do nothing
- if (contentLengthEntry != null) {
- // If the provided CL value is non-zero, override it
- if (contentLengthEntry.getValue() != null && !contentLengthEntry.getValue().equalsIgnoreCase("0")) {
- logger.warn("This is a DELETE request; the provided Content-Length was {}; setting Content-Length to 0", contentLengthEntry.getValue());
- headers.put(CONTENT_LENGTH_HEADER_KEY, "0");
- }
- }
- }
- }
-
- @Override
- public Response replicate(final PreparedRequest request, final String uri) throws IOException {
- if (!(Objects.requireNonNull(request) instanceof OkHttpPreparedRequest)) {
- throw new IllegalArgumentException("Replication Client is only able to replicate requests that the client itself has prepared");
- }
-
- return replicate((OkHttpPreparedRequest) request, uri);
- }
-
- private Response replicate(final OkHttpPreparedRequest request, final String uri) throws IOException {
- logger.debug("Replicating request {} to {}", request, uri);
- final Call call = createCall(request, uri);
- final okhttp3.Response callResponse = call.execute();
-
- final byte[] responseBytes = getResponseBytes(callResponse);
- final MultivaluedMap responseHeaders = getHeaders(callResponse);
- logger.debug("Received response code {} with headers {} for request {} to {}", callResponse.code(), responseHeaders, request, uri);
-
- final Response response = new JacksonResponse(jsonCodec, responseBytes, responseHeaders, URI.create(uri), callResponse.code(), callResponse::close);
- return response;
- }
-
- private MultivaluedMap getHeaders(final okhttp3.Response callResponse) {
- final Headers headers = callResponse.headers();
- final MultivaluedMap headerMap = new MultivaluedHashMap<>();
- for (final String name : headers.names()) {
- final List values = headers.values(name);
- headerMap.addAll(name, values);
- }
-
- return headerMap;
- }
-
- private byte[] getResponseBytes(final okhttp3.Response callResponse) throws IOException {
- final byte[] rawBytes = callResponse.body().bytes();
-
- final String contentEncoding = callResponse.header("Content-Encoding");
- if (gzipEncodings.contains(contentEncoding)) {
- try (final InputStream gzipIn = new GZIPInputStream(new ByteArrayInputStream(rawBytes));
- final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-
- StreamUtils.copy(gzipIn, baos);
- return baos.toByteArray();
- }
- } else {
- return rawBytes;
- }
- }
-
- private Call createCall(final OkHttpPreparedRequest request, final String uri) {
- Request.Builder requestBuilder = new Request.Builder();
-
- final HttpUrl url = buildUrl(request, uri);
- requestBuilder = requestBuilder.url(url);
-
- // build the request body
- final String method = request.getMethod().toUpperCase();
- switch (method) {
- case "POST":
- case "PUT":
- case "PATCH":
- requestBuilder = requestBuilder.method(method, request.getRequestBody());
- break;
- default:
- requestBuilder = requestBuilder.method(method, null);
- break;
- }
-
- // Add appropriate headers
- for (final Map.Entry header : request.getHeaders().entrySet()) {
- requestBuilder = requestBuilder.addHeader(header.getKey(), header.getValue());
- }
-
- // Build the request
- final Request okHttpRequest = requestBuilder.build();
- final Call call = okHttpClient.newCall(okHttpRequest);
- return call;
- }
-
-
- @SuppressWarnings("unchecked")
- private HttpUrl buildUrl(final OkHttpPreparedRequest request, final String uri) {
- HttpUrl.Builder urlBuilder = HttpUrl.parse(uri).newBuilder();
- switch (request.getMethod().toUpperCase()) {
- case HttpMethod.DELETE:
- case HttpMethod.HEAD:
- case HttpMethod.GET:
- case HttpMethod.OPTIONS:
- if (request.getEntity() instanceof MultivaluedMap) {
- final MultivaluedMap entityMap = (MultivaluedMap) request.getEntity();
-
- for (final Entry> queryEntry : entityMap.entrySet()) {
- final String queryName = queryEntry.getKey();
- for (final String queryValue : queryEntry.getValue()) {
- urlBuilder = urlBuilder.addQueryParameter(queryName, queryValue);
- }
- }
- }
-
- break;
- }
-
- return urlBuilder.build();
- }
-
- private RequestBody createRequestBody(final Map headers, final Object entity, final boolean gzip) {
- final String contentType = getContentType(headers, "application/json");
- final byte[] serialized = serializeEntity(entity, contentType, gzip);
-
- final MediaType mediaType = MediaType.parse(contentType);
- return RequestBody.create(serialized, mediaType);
- }
-
- private String getContentType(final Map headers, final String defaultValue) {
- for (final Map.Entry entry : headers.entrySet()) {
- if (entry.getKey().equalsIgnoreCase("content-type")) {
- return entry.getValue();
- }
- }
-
- return defaultValue;
- }
-
- private byte[] serializeEntity(final Object entity, final String contentType, final boolean gzip) {
- try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final OutputStream out = gzip ? new GZIPOutputStream(baos, 1) : baos) {
-
- getSerializer(contentType).serialize(entity, out);
- out.close();
-
- return baos.toByteArray();
- } catch (final IOException e) {
- // This should never happen with a ByteArrayOutputStream
- throw new RuntimeException("Failed to serialize entity for cluster replication", e);
- }
- }
-
- private EntitySerializer getSerializer(final String contentType) {
- switch (contentType.toLowerCase()) {
- case "application/xml":
- return xmlSerializer;
- case "application/json":
- default:
- return jsonSerializer;
- }
- }
-
-
- private Map updateHeadersForGzip(final Map headers) {
- final String encodingHeader = headers.get("Content-Encoding");
- if (gzipEncodings.contains(encodingHeader)) {
- return headers;
- }
-
- final Map updatedHeaders = new HashMap<>(headers);
- updatedHeaders.put("Content-Encoding", "gzip");
- return updatedHeaders;
- }
-
-
- private boolean isUseGzip(final Map headers) {
- String rawAcceptEncoding = headers.get(HttpHeaders.ACCEPT_ENCODING);
-
- if (rawAcceptEncoding == null) {
- rawAcceptEncoding = headers.get(HttpHeaders.ACCEPT_ENCODING.toLowerCase());
- }
-
- if (rawAcceptEncoding == null) {
- return false;
- } else {
- final String[] acceptEncodingTokens = rawAcceptEncoding.split(",");
- return Stream.of(acceptEncodingTokens)
- .map(String::trim)
- .filter(StringUtils::isNotEmpty)
- .map(String::toLowerCase)
- .anyMatch(gzipEncodings::contains);
- }
- }
-
- private OkHttpClient createOkHttpClient(final NiFiProperties properties) {
- final String connectionTimeout = properties.getClusterNodeConnectionTimeout();
- final long connectionTimeoutMs = (long) FormatUtils.getPreciseTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS);
- final String readTimeout = properties.getClusterNodeReadTimeout();
- final long readTimeoutMs = (long) FormatUtils.getPreciseTimeDuration(readTimeout, TimeUnit.MILLISECONDS);
-
- OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder();
- okHttpClientBuilder.connectTimeout(connectionTimeoutMs, TimeUnit.MILLISECONDS);
- okHttpClientBuilder.readTimeout(readTimeoutMs, TimeUnit.MILLISECONDS);
- okHttpClientBuilder.followRedirects(true);
- final int connectionPoolSize = properties.getClusterNodeMaxConcurrentRequests();
- okHttpClientBuilder.connectionPool(new ConnectionPool(connectionPoolSize, 5, TimeUnit.MINUTES));
- okHttpClientBuilder.eventListener(new RequestReplicationEventListener());
-
- if (sslContext != null) {
- final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
- okHttpClientBuilder.sslSocketFactory(sslSocketFactory, trustManager);
- }
-
- return okHttpClientBuilder.build();
- }
-}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/RequestReplicationEventListener.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/RequestReplicationEventListener.java
deleted file mode 100644
index 209a10b5ecdb..000000000000
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/RequestReplicationEventListener.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
-
-import okhttp3.Call;
-import okhttp3.Connection;
-import okhttp3.EventListener;
-import okhttp3.Handshake;
-import okhttp3.Request;
-import okhttp3.Response;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Proxy;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class RequestReplicationEventListener extends EventListener {
- private static final Logger logger = LoggerFactory.getLogger(RequestReplicationEventListener.class);
-
- private final ConcurrentMap eventListeners = new ConcurrentHashMap<>();
-
- private CallEventListener getListener(final Call call) {
- return eventListeners.computeIfAbsent(call, CallEventListener::new);
- }
-
- @Override
- public void dnsStart(@NotNull final Call call, @NotNull final String domainName) {
- super.dnsStart(call, domainName);
- getListener(call).dnsStart(domainName);
- }
-
- @Override
- public void dnsEnd(@NotNull final Call call, @NotNull final String domainName, @NotNull final List inetAddressList) {
- super.dnsEnd(call, domainName, inetAddressList);
- getListener(call).dnsEnd(domainName);
- }
-
- @Override
- public void callStart(@NotNull final Call call) {
- super.callStart(call);
- getListener(call).callStart();
- }
-
- @Override
- public void callEnd(@NotNull final Call call) {
- super.callEnd(call);
- final CallEventListener callListener = getListener(call);
- callListener.callEnd();
-
- logTimingInfo(callListener);
- eventListeners.remove(call);
- }
-
- @Override
- public void callFailed(@NotNull final Call call, @NotNull final IOException ioe) {
- super.callFailed(call, ioe);
-
- final CallEventListener callListener = getListener(call);
- callListener.callEnd();
-
- logTimingInfo(callListener);
- eventListeners.remove(call);
- }
-
- @Override
- public void responseBodyStart(@NotNull final Call call) {
- super.responseBodyStart(call);
- getListener(call).responseBodyStart();
- }
-
- @Override
- public void responseBodyEnd(@NotNull final Call call, final long byteCount) {
- super.responseBodyEnd(call, byteCount);
- getListener(call).responseBodyEnd();
- }
-
- @Override
- public void responseFailed(@NotNull final Call call, @NotNull final IOException ioe) {
- super.responseFailed(call, ioe);
- getListener(call).responseBodyEnd();
- }
-
- @Override
- public void responseHeadersStart(@NotNull final Call call) {
- super.responseHeadersStart(call);
- getListener(call).responseHeaderStart();
- }
-
- @Override
- public void responseHeadersEnd(@NotNull final Call call, @NotNull final Response response) {
- super.responseHeadersEnd(call, response);
- getListener(call).responseHeaderEnd();
- }
-
- @Override
- public void requestHeadersStart(@NotNull final Call call) {
- super.requestHeadersStart(call);
- getListener(call).requestHeaderStart();
- }
-
- @Override
- public void requestHeadersEnd(@NotNull final Call call, @NotNull final Request request) {
- super.requestHeadersEnd(call, request);
- getListener(call).requestHeaderEnd();
- }
-
- @Override
- public void requestBodyStart(@NotNull final Call call) {
- super.requestBodyStart(call);
- getListener(call).requestBodyStart();
- }
-
- @Override
- public void requestBodyEnd(@NotNull final Call call, final long byteCount) {
- super.requestBodyEnd(call, byteCount);
- getListener(call).requestBodyEnd();
- }
-
- @Override
- public void requestFailed(@NotNull final Call call, @NotNull final IOException ioe) {
- super.requestFailed(call, ioe);
- getListener(call).requestBodyEnd();
- }
-
- @Override
- public void connectStart(@NotNull final Call call, @NotNull final InetSocketAddress inetSocketAddress, @NotNull final Proxy proxy) {
- super.connectStart(call, inetSocketAddress, proxy);
- getListener(call).connectStart(inetSocketAddress);
- }
-
- @Override
- public void connectionAcquired(@NotNull final Call call, @NotNull final Connection connection) {
- super.connectionAcquired(call, connection);
- getListener(call).connectionAcquired(connection.socket().getRemoteSocketAddress());
- }
-
- @Override
- public void secureConnectStart(@NotNull final Call call) {
- super.secureConnectStart(call);
- getListener(call).secureConnectStart();
- }
-
- @Override
- public void secureConnectEnd(@NotNull final Call call, @Nullable final Handshake handshake) {
- super.secureConnectEnd(call, handshake);
- getListener(call).secureConnectEnd();
- }
-
- private void logTimingInfo(final CallEventListener eventListener) {
- logger.debug("Timing information {}", eventListener);
- }
-}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
index 8135b444cd1e..d0ce10d6f564 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
@@ -245,13 +245,6 @@ private Response createResponse() {
* the content-length. Let the outgoing response builder determine it.
*/
continue;
- } else if (key.equals("X-ClusterContext")) {
- /*
- * do not copy the cluster context to the response because
- * this information is private and should not be sent to
- * the client
- */
- continue;
}
responseBuilder.header(key, value);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/framework/configuration/FrameworkClusterConfiguration.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/framework/configuration/FrameworkClusterConfiguration.java
index 10ecf5ccd77e..ad4ded63217b 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/framework/configuration/FrameworkClusterConfiguration.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/framework/configuration/FrameworkClusterConfiguration.java
@@ -19,15 +19,17 @@
import org.apache.nifi.cluster.ClusterDetailsFactory;
import org.apache.nifi.cluster.StandardClusterDetailsFactory;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.http.replication.HttpReplicationClient;
import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
import org.apache.nifi.cluster.coordination.http.replication.StandardUploadRequestReplicator;
import org.apache.nifi.cluster.coordination.http.replication.ThreadPoolRequestReplicator;
import org.apache.nifi.cluster.coordination.http.replication.UploadRequestReplicator;
-import org.apache.nifi.cluster.coordination.http.replication.okhttp.OkHttpReplicationClient;
+import org.apache.nifi.cluster.coordination.http.replication.client.StandardHttpReplicationClient;
import org.apache.nifi.cluster.lifecycle.ClusterDecommissionTask;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.client.StandardHttpUriBuilder;
import org.apache.nifi.web.client.api.WebClientService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
@@ -87,7 +89,7 @@ public ThreadPoolRequestReplicator requestReplicator(
if (clusterCoordinator == null) {
replicator = null;
} else {
- final OkHttpReplicationClient replicationClient = new OkHttpReplicationClient(properties, sslContext, trustManager);
+ final HttpReplicationClient replicationClient = new StandardHttpReplicationClient(webClientService, StandardHttpUriBuilder::new);
replicator = new ThreadPoolRequestReplicator(
properties.getClusterNodeProtocolMaxPoolSize(),
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index 6524edec96e3..0db0287e6234 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -72,6 +72,8 @@
public class TestThreadPoolRequestReplicator {
+ private static final String NODE_CONTINUE = "202-Accepted";
+
@BeforeAll
public static void setupClass() {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties");
@@ -278,11 +280,11 @@ public void testMultipleRequestWithTwoPhaseCommit() throws Exception {
protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId,
final URI uri, final String requestId, final StandardAsyncClusterResponse response) {
// the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them.
- final Object expectsHeader = request.getHeaders().get(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
+ final Object expectsHeader = request.getHeaders().get(RequestReplicationHeader.VALIDATION_EXPECTS.getHeader());
final int statusCode;
if (requestCount.incrementAndGet() == 1) {
- assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, expectsHeader);
+ assertEquals(NODE_CONTINUE, expectsHeader);
statusCode = Status.ACCEPTED.getStatusCode();
} else {
assertNull(expectsHeader);
@@ -343,10 +345,10 @@ public void testOneNodeRejectsTwoPhaseCommit() {
protected NodeResponse replicateRequest(final PreparedRequest request, final NodeIdentifier nodeId,
final URI uri, final String requestId, final StandardAsyncClusterResponse response) {
// the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them.
- final Object expectsHeader = request.getHeaders().get(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
+ final Object expectsHeader = request.getHeaders().get(RequestReplicationHeader.VALIDATION_EXPECTS.getHeader());
final int requestIndex = requestCount.incrementAndGet();
- assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, expectsHeader);
+ assertEquals(NODE_CONTINUE, expectsHeader);
if (requestIndex == 1) {
final Response clientResponse = mock(Response.class);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/client/TestStandardHttpReplicationClient.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/client/TestStandardHttpReplicationClient.java
new file mode 100644
index 000000000000..485011309a45
--- /dev/null
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/client/TestStandardHttpReplicationClient.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http.replication.client;
+
+import jakarta.ws.rs.core.MultivaluedHashMap;
+import jakarta.ws.rs.core.MultivaluedMap;
+import jakarta.ws.rs.core.Response;
+import org.apache.nifi.cluster.coordination.http.replication.PreparedRequest;
+import org.apache.nifi.web.client.StandardHttpUriBuilder;
+import org.apache.nifi.web.client.api.HttpEntityHeaders;
+import org.apache.nifi.web.client.api.HttpRequestBodySpec;
+import org.apache.nifi.web.client.api.HttpRequestUriSpec;
+import org.apache.nifi.web.client.api.HttpResponseEntity;
+import org.apache.nifi.web.client.api.WebClientService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class TestStandardHttpReplicationClient {
+
+ private static final String GET_METHOD = "GET";
+
+ private static final String POST_METHOD = "POST";
+
+ private static final byte[] EMPTY_MAP_SERIALIZED = new byte[]{123, 125};
+
+ private static final String CONTENT_TYPE_LOWERCASED = "content-type";
+
+ private static final String APPLICATION_JSON = "application/json";
+
+ private static final URI REPLICATE_URI = URI.create("http://localhost/nifi-api/flow/current-user");
+
+ private static final String URI_QUERY = "recursive=false";
+
+ private static final URI REPLICATE_URI_QUERY = URI.create("http://localhost/nifi-api/flow/process-groups/root/status?%s".formatted(URI_QUERY));
+
+ private static final String QUERY_PARAMETER_NAME = "revision";
+
+ private static final String QUERY_PARAMETER_VALUE = "1";
+
+ private static final String QUERY_EXPECTED = "%s=%s".formatted(QUERY_PARAMETER_NAME, QUERY_PARAMETER_VALUE);
+
+ private static final String STATUS_PSEUDO_HEADER = ":status";
+
+ @Mock
+ private WebClientService webClientService;
+
+ @Mock
+ private HttpRequestUriSpec httpRequestUriSpec;
+
+ @Mock
+ private HttpRequestBodySpec httpRequestBodySpec;
+
+ @Mock
+ private HttpResponseEntity httpResponseEntity;
+
+ @Mock
+ private HttpEntityHeaders httpResponseHeaders;
+
+ @Captor
+ private ArgumentCaptor uriCaptor;
+
+ private StandardHttpReplicationClient client;
+
+ @BeforeEach
+ void setClient() {
+ client = new StandardHttpReplicationClient(webClientService, StandardHttpUriBuilder::new);
+ }
+
+ @Test
+ void testPrepareRequest() {
+ final Map headers = Collections.emptyMap();
+ final Map requestEntity = Collections.emptyMap();
+ final PreparedRequest preparedRequest = client.prepareRequest(GET_METHOD, headers, requestEntity);
+
+ assertNotNull(preparedRequest);
+ assertInstanceOf(StandardPreparedRequest.class, preparedRequest);
+
+ assertEquals(GET_METHOD, preparedRequest.getMethod());
+ assertNotEquals(headers, preparedRequest.getHeaders());
+ assertEquals(requestEntity, preparedRequest.getEntity());
+
+ final StandardPreparedRequest standardPreparedRequest = (StandardPreparedRequest) preparedRequest;
+ assertArrayEquals(EMPTY_MAP_SERIALIZED, standardPreparedRequest.requestBody());
+ }
+
+ @Test
+ void testReplicateIllegalArgumentException() {
+ assertThrows(IllegalArgumentException.class, () -> client.replicate(null, REPLICATE_URI));
+ }
+
+ @Test
+ void testReplicate() throws IOException {
+ final Map headers = Map.of(CONTENT_TYPE_LOWERCASED, APPLICATION_JSON);
+ final Map requestEntity = Collections.emptyMap();
+ final PreparedRequest preparedRequest = client.prepareRequest(GET_METHOD, headers, requestEntity);
+
+ when(webClientService.method(any())).thenReturn(httpRequestUriSpec);
+ when(httpRequestUriSpec.uri(any())).thenReturn(httpRequestBodySpec);
+ when(httpRequestBodySpec.header(anyString(), anyString())).thenReturn(httpRequestBodySpec);
+ when(httpRequestBodySpec.retrieve()).thenReturn(httpResponseEntity);
+
+ when(httpResponseEntity.statusCode()).thenReturn(HTTP_OK);
+ when(httpResponseEntity.headers()).thenReturn(httpResponseHeaders);
+
+ final Set responseHeaderNames = Set.of(
+ PreparedRequestHeader.CONTENT_TYPE.getHeader(),
+ PreparedRequestHeader.CONTENT_ENCODING.getHeader(),
+ PreparedRequestHeader.CONTENT_LENGTH.getHeader(),
+ STATUS_PSEUDO_HEADER
+ );
+ when(httpResponseHeaders.getHeaderNames()).thenReturn(responseHeaderNames);
+ when(httpResponseHeaders.getHeader(eq(PreparedRequestHeader.CONTENT_TYPE.getHeader()))).thenReturn(List.of(APPLICATION_JSON));
+
+ final ByteArrayInputStream responseBody = new ByteArrayInputStream(EMPTY_MAP_SERIALIZED);
+ when(httpResponseEntity.body()).thenReturn(responseBody);
+
+ final Response response = client.replicate(preparedRequest, REPLICATE_URI);
+
+ assertResponseFound(response);
+
+ final String responseContentType = response.getHeaderString(PreparedRequestHeader.CONTENT_TYPE.getHeader());
+ assertEquals(APPLICATION_JSON, responseContentType);
+
+ final String responseStatusHeader = response.getHeaderString(STATUS_PSEUDO_HEADER);
+ assertNull(responseStatusHeader);
+
+ final String contentEncodingHeader = response.getHeaderString(PreparedRequestHeader.CONTENT_ENCODING.getHeader());
+ assertNull(contentEncodingHeader);
+
+ final String contentLengthHeader = response.getHeaderString(PreparedRequestHeader.CONTENT_LENGTH.getHeader());
+ assertNull(contentLengthHeader);
+ }
+
+ @Test
+ void testReplicatePostBody() throws IOException {
+ final Map headers = Map.of(CONTENT_TYPE_LOWERCASED, APPLICATION_JSON);
+ final Map requestEntity = Collections.emptyMap();
+ final PreparedRequest preparedRequest = client.prepareRequest(POST_METHOD, headers, requestEntity);
+
+ when(webClientService.method(any())).thenReturn(httpRequestUriSpec);
+ when(httpRequestUriSpec.uri(any())).thenReturn(httpRequestBodySpec);
+ when(httpRequestBodySpec.header(anyString(), anyString())).thenReturn(httpRequestBodySpec);
+ when(httpRequestBodySpec.retrieve()).thenReturn(httpResponseEntity);
+
+ when(httpResponseEntity.statusCode()).thenReturn(HTTP_OK);
+ when(httpResponseEntity.headers()).thenReturn(httpResponseHeaders);
+
+ final ByteArrayInputStream responseBody = new ByteArrayInputStream(EMPTY_MAP_SERIALIZED);
+ when(httpResponseEntity.body()).thenReturn(responseBody);
+
+ final Response response = client.replicate(preparedRequest, REPLICATE_URI);
+
+ assertResponseFound(response);
+ }
+
+ @Test
+ void testReplicateGetMultivaluedMap() throws IOException {
+ final Map headers = Map.of(PreparedRequestHeader.CONTENT_TYPE.getHeader(), APPLICATION_JSON);
+
+ final MultivaluedMap requestEntity = new MultivaluedHashMap<>();
+ requestEntity.add(QUERY_PARAMETER_NAME, QUERY_PARAMETER_VALUE);
+ final PreparedRequest preparedRequest = client.prepareRequest(GET_METHOD, headers, requestEntity);
+
+ when(webClientService.method(any())).thenReturn(httpRequestUriSpec);
+ when(httpRequestUriSpec.uri(any())).thenReturn(httpRequestBodySpec);
+ when(httpRequestBodySpec.header(anyString(), anyString())).thenReturn(httpRequestBodySpec);
+ when(httpRequestBodySpec.retrieve()).thenReturn(httpResponseEntity);
+
+ when(httpResponseEntity.statusCode()).thenReturn(HTTP_OK);
+ when(httpResponseEntity.headers()).thenReturn(httpResponseHeaders);
+
+ final ByteArrayInputStream responseBody = new ByteArrayInputStream(EMPTY_MAP_SERIALIZED);
+ when(httpResponseEntity.body()).thenReturn(responseBody);
+
+ final Response response = client.replicate(preparedRequest, REPLICATE_URI);
+
+ assertResponseFound(response);
+
+ verify(httpRequestUriSpec).uri(uriCaptor.capture());
+
+ final URI requestUri = uriCaptor.getValue();
+ assertEquals(QUERY_EXPECTED, requestUri.getQuery());
+ }
+
+ @Test
+ void testReplicateGetUriQuery() throws IOException {
+ final Map headers = Map.of(PreparedRequestHeader.CONTENT_TYPE.getHeader(), APPLICATION_JSON);
+
+ final PreparedRequest preparedRequest = client.prepareRequest(GET_METHOD, headers, Collections.emptyMap());
+
+ when(webClientService.method(any())).thenReturn(httpRequestUriSpec);
+ when(httpRequestUriSpec.uri(any())).thenReturn(httpRequestBodySpec);
+ when(httpRequestBodySpec.header(anyString(), anyString())).thenReturn(httpRequestBodySpec);
+ when(httpRequestBodySpec.retrieve()).thenReturn(httpResponseEntity);
+
+ when(httpResponseEntity.statusCode()).thenReturn(HTTP_OK);
+ when(httpResponseEntity.headers()).thenReturn(httpResponseHeaders);
+
+ final ByteArrayInputStream responseBody = new ByteArrayInputStream(EMPTY_MAP_SERIALIZED);
+ when(httpResponseEntity.body()).thenReturn(responseBody);
+
+ final Response response = client.replicate(preparedRequest, REPLICATE_URI_QUERY);
+
+ assertResponseFound(response);
+
+ verify(httpRequestUriSpec).uri(uriCaptor.capture());
+
+ final URI requestUri = uriCaptor.getValue();
+ assertEquals(URI_QUERY, requestUri.getQuery());
+ }
+
+ private void assertResponseFound(final Response response) {
+ assertNotNull(response);
+ assertEquals(HTTP_OK, response.getStatus());
+ }
+}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/TestJsonEntitySerializer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/io/TestJsonEntitySerializer.java
similarity index 98%
rename from nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/TestJsonEntitySerializer.java
rename to nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/io/TestJsonEntitySerializer.java
index 982735edab86..9f138724a62d 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/okhttp/TestJsonEntitySerializer.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/io/TestJsonEntitySerializer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.nifi.cluster.coordination.http.replication.okhttp;
+package org.apache.nifi.cluster.coordination.http.replication.io;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/util/MockReplicationClient.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/util/MockReplicationClient.java
index fbda29d9ded6..922e26c9519d 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/util/MockReplicationClient.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/util/MockReplicationClient.java
@@ -17,7 +17,6 @@
package org.apache.nifi.cluster.coordination.http.replication.util;
-import java.io.IOException;
import java.lang.annotation.Annotation;
import java.net.URI;
import java.util.Collections;
@@ -71,7 +70,7 @@ public Object getEntity() {
}
@Override
- public Response replicate(PreparedRequest request, String uri) throws IOException {
+ public Response replicate(PreparedRequest request, URI uri) {
return new Response() {
@Override
@@ -173,7 +172,7 @@ public Date getLastModified() {
@Override
public URI getLocation() {
- return URI.create(uri);
+ return uri;
}
@Override
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/FlowAnalysisResultEntityMergerTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/FlowAnalysisResultEntityMergerTest.java
index a76daf61d74d..ff2c9843f03d 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/FlowAnalysisResultEntityMergerTest.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/FlowAnalysisResultEntityMergerTest.java
@@ -22,7 +22,6 @@
import org.apache.nifi.web.api.dto.FlowAnalysisRuleViolationDTO;
import org.apache.nifi.web.api.dto.PermissionsDTO;
import org.apache.nifi.web.api.entity.FlowAnalysisResultEntity;
-import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -187,13 +186,11 @@ private void testMerge(FlowAnalysisResultEntity clientEntity, Map rules, List ruleViolations) {
FlowAnalysisResultEntity clientEntity = new FlowAnalysisResultEntity();
@@ -236,7 +230,6 @@ private static FlowAnalysisResultEntity resultEntityOf(List
return clientEntity;
}
- @NotNull
private static Map resultEntityMapOf(FlowAnalysisResultEntity clientEntity1, FlowAnalysisResultEntity clientEntity2) {
Map entityMap = new HashMap<>();
@@ -246,7 +239,6 @@ private static Map resultEntityMapOf(F
return entityMap;
}
- @NotNull
private static List listOf(T... items) {
List itemSet = new ArrayList<>();
for (T item : items) {
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/AssetsRestApiClient.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/AssetsRestApiClient.java
index b53a2387a2d3..b8f2052dd514 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/AssetsRestApiClient.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/AssetsRestApiClient.java
@@ -63,7 +63,7 @@ public AssetsEntity getAssets(final String parameterContextId) {
final HttpRequestBodySpec requestBodySpec = webClientService.get()
.uri(requestUri)
.header(ACCEPT_HEADER, APPLICATION_JSON)
- .header(X_REQUEST_REPLICATED_HEADER, "true");
+ .header(REQUEST_REPLICATED_HEADER, Boolean.TRUE.toString());
return executeEntityRequest(requestUri, requestBodySpec, AssetsEntity.class);
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/client/NiFiRestApiClient.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/client/NiFiRestApiClient.java
index 18171eef6903..7c6fe765b948 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/client/NiFiRestApiClient.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/client/NiFiRestApiClient.java
@@ -42,7 +42,7 @@ public abstract class NiFiRestApiClient {
private static final String HTTPS_SCHEME = "https";
protected static final String ACCEPT_HEADER = "Accept";
- protected static final String X_REQUEST_REPLICATED_HEADER = "X-Request-Replicated";
+ protected static final String REQUEST_REPLICATED_HEADER = "request-replicated";
protected static final String APPLICATION_JSON = "application/json";
protected static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarRestApiClient.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarRestApiClient.java
index e82bd6875100..d3a05518baa3 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarRestApiClient.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/nar/NarRestApiClient.java
@@ -65,7 +65,7 @@ public NarSummariesEntity listNarSummaries() {
final HttpRequestBodySpec requestBodySpec = webClientService.get()
.uri(requestUri)
.header(ACCEPT_HEADER, APPLICATION_JSON)
- .header(X_REQUEST_REPLICATED_HEADER, "true");
+ .header(REQUEST_REPLICATED_HEADER, Boolean.TRUE.toString());
return executeEntityRequest(requestUri, requestBodySpec, NarSummariesEntity.class);
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
index cfa7f63ce395..83b825073695 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
@@ -19,6 +19,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.http.replication.RequestReplicationHeader;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.manager.NodeResponse;
@@ -91,7 +92,7 @@ public DownloadableContent getContent(final ContentRequestContext request) {
// replicate the request to the cluster coordinator, indicating the target node
NodeResponse nodeResponse;
try {
- headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId());
+ headers.put(RequestReplicationHeader.REPLICATION_TARGET_ID.getHeader(), nodeId.getId());
final NodeIdentifier coordinatorNode = clusterCoordinator.getElectedActiveCoordinatorNode();
if (coordinatorNode == null) {
throw new NoClusterCoordinatorException();
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index cb0cf76d33e8..759eecc92b3a 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -32,6 +32,7 @@
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.http.replication.RequestReplicationHeader;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@@ -87,6 +88,7 @@
import org.springframework.security.core.context.SecurityContextHolder;
import javax.net.ssl.SSLPeerUnverifiedException;
+import java.net.HttpURLConnection;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.security.cert.X509Certificate;
@@ -222,7 +224,7 @@ protected String generateUuid() {
}
protected Optional getIdGenerationSeed() {
- final String idGenerationSeed = httpServletRequest.getHeader(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER);
+ final String idGenerationSeed = httpServletRequest.getHeader(RequestReplicationHeader.CLUSTER_ID_GENERATION_SEED.getHeader());
if (StringUtils.isBlank(idGenerationSeed)) {
return Optional.empty();
}
@@ -268,7 +270,7 @@ protected ResponseBuilder generateCreatedResponse(final URI uri, final Object en
* @return a 202 Accepted (Node Continue) response to be used within the cluster request handshake
*/
protected ResponseBuilder generateContinueResponse() {
- return Response.status(RequestReplicator.NODE_CONTINUE_STATUS_CODE);
+ return Response.status(HttpURLConnection.HTTP_ACCEPTED);
}
protected URI getAbsolutePath() {
@@ -317,50 +319,15 @@ protected Map getHeaders(final Map overriddenHea
}
}
- // if the scheme is not set by the client, include the details from this request but don't override
- final String proxyScheme = getFirstHeaderValue(ProxyHeader.PROXY_SCHEME.getHeader(), ProxyHeader.FORWARDED_PROTO.getHeader());
- if (proxyScheme == null) {
- result.put(ProxyHeader.PROXY_SCHEME.getHeader(), httpServletRequest.getScheme());
- }
-
- // if the host is not set by the client, include the details from this request but don't override
- final String proxyHost = getFirstHeaderValue(ProxyHeader.PROXY_HOST.getHeader(), ProxyHeader.FORWARDED_HOST.getHeader());
- if (proxyHost == null) {
- result.put(ProxyHeader.PROXY_HOST.getHeader(), httpServletRequest.getServerName());
- }
-
- // if the port is not set by the client, include the details from this request but don't override
- final String proxyPort = getFirstHeaderValue(ProxyHeader.PROXY_PORT.getHeader(), ProxyHeader.FORWARDED_PORT.getHeader());
- if (proxyPort == null) {
- result.put(ProxyHeader.PROXY_PORT.getHeader(), String.valueOf(httpServletRequest.getServerPort()));
- }
+ final URI requestUri = RequestUriBuilder.fromHttpServletRequest(httpServletRequest).build();
+ // Set Proxy Headers based on resolved URI from supported values
+ result.put(ProxyHeader.PROXY_SCHEME.getHeader(), requestUri.getScheme());
+ result.put(ProxyHeader.PROXY_HOST.getHeader(), requestUri.getHost());
+ result.put(ProxyHeader.PROXY_PORT.getHeader(), Integer.toString(requestUri.getPort()));
return result;
}
- /**
- * Returns the value for the first key discovered when inspecting the current request. Will
- * return null if there are no keys specified or if none of the specified keys are found.
- *
- * @param keys http header keys
- * @return the value for the first key found
- */
- private String getFirstHeaderValue(final String... keys) {
- if (keys == null) {
- return null;
- }
-
- for (final String key : keys) {
- final String value = httpServletRequest.getHeader(key);
-
- if (value != null) {
- return value;
- }
- }
-
- return null;
- }
-
/**
* Checks whether the request is part of a two-phase commit style request (either phase 1 or phase 2)
*
@@ -368,7 +335,7 @@ private String getFirstHeaderValue(final String... keys) {
* @return true
if the request represents a two-phase commit style request
*/
protected boolean isTwoPhaseRequest(final HttpServletRequest httpServletRequest) {
- final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
+ final String transactionId = httpServletRequest.getHeader(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
return transactionId != null && isClustered();
}
@@ -383,15 +350,15 @@ protected boolean isTwoPhaseRequest(final HttpServletRequest httpServletRequest)
* first of the two phases.
*/
protected boolean isValidationPhase(final HttpServletRequest httpServletRequest) {
- return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER) != null;
+ return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicationHeader.VALIDATION_EXPECTS.getHeader()) != null;
}
protected boolean isExecutionPhase(final HttpServletRequest httpServletRequest) {
- return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_EXECUTION_HTTP_HEADER) != null;
+ return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader()) != null;
}
protected boolean isCancellationPhase(final HttpServletRequest httpServletRequest) {
- return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER) != null;
+ return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicationHeader.CANCEL_TRANSACTION.getHeader()) != null;
}
/**
@@ -412,9 +379,9 @@ boolean isReplicateRequest() {
return false;
}
- // Check if the X-Request-Replicated header is set. If so, the request has already been replicated,
+ // Check if the replicated header is set. If so, the request has already been replicated,
// so we need to service the request locally. If not, then replicate the request to the entire cluster.
- final String header = httpServletRequest.getHeader(RequestReplicator.REPLICATION_INDICATOR_HEADER);
+ final String header = httpServletRequest.getHeader(RequestReplicationHeader.REQUEST_REPLICATED.getHeader());
return header == null;
}
@@ -719,7 +686,7 @@ private void phaseOneStoreTransaction(final T requestEntity,
}
// get the transaction id
- final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
+ final String transactionId = httpServletRequest.getHeader(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
if (StringUtils.isBlank(transactionId)) {
throw new IllegalArgumentException("Two phase commit Transaction Id missing.");
}
@@ -739,7 +706,7 @@ private void phaseOneStoreTransaction(final T requestEntity,
private Request phaseTwoVerifyTransaction() {
// get the transaction id
- final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
+ final String transactionId = httpServletRequest.getHeader(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
if (StringUtils.isBlank(transactionId)) {
throw new IllegalArgumentException("Two phase commit Transaction Id missing.");
}
@@ -775,7 +742,7 @@ private Request phaseTwoVerifyTransaction() {
private void cancelTransaction() {
// get the transaction id
- final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
+ final String transactionId = httpServletRequest.getHeader(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
if (StringUtils.isBlank(transactionId)) {
throw new IllegalArgumentException("Two phase commit Transaction Id missing.");
}
@@ -891,7 +858,7 @@ protected Response replicate(final URI path, final String method, final Object e
final Set targetNodes = Collections.singleton(nodeId);
return requestReplicator.replicate(targetNodes, method, path, entity, headers, true, true).awaitMergedResponse().getResponse();
} else {
- headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId());
+ headers.put(RequestReplicationHeader.REPLICATION_TARGET_ID.getHeader(), nodeId.getId());
return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse().getResponse();
}
} catch (final InterruptedException ie) {
@@ -936,7 +903,7 @@ protected Response replicate(final String method, final NodeIdentifier targetNod
final Set nodeIds = Collections.singleton(targetNode);
return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), entity, getHeaders(), true, true).awaitMergedResponse().getResponse();
} else {
- final Map headers = getHeaders(Collections.singletonMap(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, targetNode.getId()));
+ final Map headers = getHeaders(Collections.singletonMap(RequestReplicationHeader.REPLICATION_TARGET_ID.getHeader(), targetNode.getId()));
return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, getAbsolutePath(), entity, headers).awaitMergedResponse().getResponse();
}
} catch (final InterruptedException ie) {
@@ -1049,7 +1016,7 @@ protected NodeResponse replicateNodeResponse(final URI path, final String method
}
} finally {
final long replicateNanos = System.nanoTime() - replicateStart;
- final String transactionId = headers.get(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
+ final String transactionId = headers.get(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
final String requestId = transactionId == null ? "Request with no ID" : transactionId;
logger.debug("Took a total of {} millis to {} for {}", TimeUnit.NANOSECONDS.toMillis(replicateNanos), action, requestId);
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java
index ff72e7853598..ffd3b72d28aa 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java
@@ -45,7 +45,7 @@
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
-import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
+import org.apache.nifi.cluster.coordination.http.replication.RequestReplicationHeader;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.web.DownloadableContent;
@@ -451,7 +451,7 @@ public Response submitReplay(
}
// handle expects request (usually from the cluster manager)
- final String expects = httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
+ final String expects = httpServletRequest.getHeader(RequestReplicationHeader.VALIDATION_EXPECTS.getHeader());
if (expects != null) {
return generateContinueResponse().build();
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/csrf/SkipReplicatedCsrfFilter.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/csrf/SkipReplicatedCsrfFilter.java
index 2f22cd5412ec..e617e1c8697a 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/csrf/SkipReplicatedCsrfFilter.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-security/src/main/java/org/apache/nifi/web/security/csrf/SkipReplicatedCsrfFilter.java
@@ -33,8 +33,8 @@
* Skip Replicated Cross-Site Request Forgery Filter disables subsequent filtering for matched requests
*/
public class SkipReplicatedCsrfFilter extends OncePerRequestFilter {
- /** RequestReplicator.REQUEST_TRANSACTION_ID_HEADER applied to replicated cluster requests */
- protected static final String REPLICATED_REQUEST_HEADER = "X-RequestTransactionId";
+ /** Replication HTTP Header applied to replicated cluster requests */
+ protected static final String REPLICATED_REQUEST_HEADER = "request-transaction-id";
/** Requests containing replicated header and not containing authorization cookies will be skipped */
private static final RequestMatcher REQUEST_MATCHER = new AndRequestMatcher(
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index bb9b88ccfb88..8bd2d5063f19 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -77,7 +77,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
// Group ID | Source Name | Dest Name | Conn Name | Queue Size |
private static final String QUEUE_SIZES_FORMAT = "| %1$-36.36s | %2$-30.30s | %3$-30.30s | %4$-30.30s | %5$-30.30s |";
- public static final RequestConfig DO_NOT_REPLICATE = () -> Collections.singletonMap("X-Request-Replicated", "value");
+ public static final RequestConfig DO_NOT_REPLICATE = () -> Collections.singletonMap("request-replicated", Boolean.TRUE.toString());
public static final int CLUSTERED_CLIENT_API_BASE_PORT = 5671;
public static final int STANDALONE_CLIENT_API_BASE_PORT = 5670;