Skip to content

Commit

Permalink
NIFI-13718 Switched Request Replication to Web Client Service (apache…
Browse files Browse the repository at this point in the history
…#9234)

- Added Request Replication Header enumeration with lowercased header names for HTTP/2
  • Loading branch information
exceptionfactory authored Sep 12, 2024
1 parent 6355812 commit 6e5a276
Show file tree
Hide file tree
Showing 32 changed files with 851 additions and 845 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<artifactId>nifi-framework-components</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client</artifactId>
Expand Down Expand Up @@ -95,11 +100,6 @@
<version>2.0.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>24.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>c2-protocol-component-api</artifactId>
Expand Down Expand Up @@ -207,10 +207,6 @@
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>

<!-- spring dependencies -->
<dependency>
Expand Down Expand Up @@ -251,6 +247,18 @@
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<!-- Add Implementation-Version for User-Agent Header in replicated requests -->
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
import org.apache.nifi.cluster.coordination.http.endpoints.UserGroupsEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.UsersEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.VerifyConfigEndpointMerger;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.util.FormatUtils;
Expand All @@ -104,6 +103,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -282,7 +282,7 @@ private void drainResponses(final Set<NodeResponse> responses, final NodeRespons
responses.stream()
.parallel() // "parallelize" the draining of the responses, since we have multiple streams to consume
.filter(response -> response != exclude) // don't include the explicitly excluded node
.filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any continue responses because they contain no content
.filter(response -> response.getStatus() != HttpURLConnection.HTTP_ACCEPTED) // don't include any continue responses because they contain no content
.forEach(this::drainResponse); // drain all node responses that didn't get filtered out
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.nifi.cluster.coordination.http.replication;

import java.io.IOException;
import java.net.URI;
import java.util.Map;

import jakarta.ws.rs.core.Response;
Expand All @@ -26,6 +27,6 @@ public interface HttpReplicationClient {

PreparedRequest prepareRequest(String method, Map<String, String> headers, Object entity);

Response replicate(PreparedRequest request, String uri) throws IOException;
Response replicate(PreparedRequest request, URI uri) throws IOException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.coordination.http.replication;

/**
* Enumeration of HTTP headers for Request Replication with lowercasing for compatibility with HTTP/2
*/
public enum RequestReplicationHeader {
/**
* Indicator to cancel transaction processing
*/
CANCEL_TRANSACTION("cancel-transaction"),

/**
* Seed for deterministic cluster identifier generation
*/
CLUSTER_ID_GENERATION_SEED("cluster-id-generation-seed"),

/**
* Indicator to continue transaction processing
*/
EXECUTION_CONTINUE("execution-continue"),

/**
* When replicating a request to the cluster coordinator, it may be useful to denote that the request should
* be replicated only to a single node. This happens, for instance, when retrieving a Provenance Event that
* we know lives on a specific node. This request must still be replicated through the cluster coordinator.
* This header tells the cluster coordinator the UUID's (comma-separated list, possibly with spaces between)
* of the nodes that the request should be replicated to.
*/
REPLICATION_TARGET_ID("replication-target-id"),

/**
* When we replicate a request across the cluster, we replicate it only from the cluster coordinator.
* If the request needs to be replicated by another node, it first replicates the request to the coordinator,
* which then replicates the request on the node's behalf. This header name and value are used to denote
* that the request has already been to the cluster coordinator, and the cluster coordinator is the one replicating
* the request. This allows us to know that the request should be serviced, rather than proxied back to the
* cluster coordinator.
*/
REQUEST_REPLICATED("request-replicated"),

/**
* Transaction Identifier for replicated requests
*/
REQUEST_TRANSACTION_ID("request-transaction-id"),

/**
* The HTTP header that the requestor specifies to ask a node if they are able to process a given request.
* The value is always 202-Accepted. The node will respond with 202 ACCEPTED if it is able to
* process the request, 417 EXPECTATION_FAILED otherwise.
*/
VALIDATION_EXPECTS("validation-expects");

private final String header;

RequestReplicationHeader(final String header) {
this.header = header;
}

public String getHeader() {
return header;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,48 +26,6 @@
import java.util.Set;

public interface RequestReplicator {

public static final String REQUEST_TRANSACTION_ID_HEADER = "X-RequestTransactionId";
public static final String CLUSTER_ID_GENERATION_SEED_HEADER = "X-Cluster-Id-Generation-Seed";

/**
* The HTTP header that the requestor specifies to ask a node if they are able to process a given request.
* The value is always 202-Accepted. The node will respond with 202 ACCEPTED if it is able to
* process the request, 417 EXPECTATION_FAILED otherwise.
*/
public static final String REQUEST_VALIDATION_HTTP_HEADER = "X-Validation-Expects";
public static final String NODE_CONTINUE = "202-Accepted";
public static final int NODE_CONTINUE_STATUS_CODE = 202;

/**
* Indicates that the request is intended to cancel a transaction that was previously created without performing the action
*/
public static final String REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER = "X-Cancel-Transaction";

/**
* Indicates that this is the second phase of the two phase commit and the execution of the action should proceed.
*/
public static final String REQUEST_EXECUTION_HTTP_HEADER = "X-Execution-Continue";

/**
* When we replicate a request across the cluster, we replicate it only from the cluster coordinator.
* If the request needs to be replicated by another node, it first replicates the request to the coordinator,
* which then replicates the request on the node's behalf. This header name and value are used to denote
* that the request has already been to the cluster coordinator, and the cluster coordinator is the one replicating
* the request. This allows us to know that the request should be serviced, rather than proxied back to the
* cluster coordinator.
*/
public static final String REPLICATION_INDICATOR_HEADER = "X-Request-Replicated";

/**
* When replicating a request to the cluster coordinator, it may be useful to denote that the request should
* be replicated only to a single node. This happens, for instance, when retrieving a Provenance Event that
* we know lives on a specific node. This request must still be replicated through the cluster coordinator.
* This header tells the cluster coordinator the UUID's (comma-separated list, possibly with spaces between)
* of the nodes that the request should be replicated to.
*/
public static final String REPLICATION_TARGET_NODE_UUID_HEADER = "X-Replication-Target-Id";

/**
* Stops the instance from replicating requests. Calling this method on a stopped instance has no effect.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ private <T> T replicateRequest(final NodeIdentifier nodeId, final UploadRequest<
.uri(requestUri)
.body(inputStream, OptionalLong.of(inputStream.available()))
// Special NiFi-specific headers to indicate that the request should be performed and not replicated to the nodes
.header(RequestReplicator.REQUEST_EXECUTION_HTTP_HEADER, "true")
.header(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true")
.header(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader(), Boolean.TRUE.toString())
.header(RequestReplicationHeader.REQUEST_REPLICATED.getHeader(), Boolean.TRUE.toString())
.header(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user))
.header(ProxiedEntitiesUtils.PROXY_ENTITY_GROUPS, ProxiedEntitiesUtils.buildProxiedEntityGroupsString(user.getIdentityProviderGroups()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand Down Expand Up @@ -98,6 +99,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator, Closeable

private static final String COOKIE_HEADER = "Cookie";
private static final String HOST_HEADER = "Host";
private static final String NODE_CONTINUE = "202-Accepted";

private final int maxConcurrentRequests; // maximum number of concurrent requests
private final HttpResponseMapper responseMapper;
Expand Down Expand Up @@ -276,9 +278,9 @@ public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, final NiFiUse
final boolean indicateReplicated, final boolean performVerification) {
final Map<String, String> updatedHeaders = new HashMap<>(headers);

updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, ComponentIdGenerator.generateId().toString());
updatedHeaders.put(RequestReplicationHeader.CLUSTER_ID_GENERATION_SEED.getHeader(), ComponentIdGenerator.generateId().toString());
if (indicateReplicated) {
updatedHeaders.put(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true");
updatedHeaders.put(RequestReplicationHeader.REQUEST_REPLICATED.getHeader(), Boolean.TRUE.toString());
}

// include the proxied entities header
Expand Down Expand Up @@ -380,7 +382,7 @@ AsyncClusterResponse replicate(final Set<NodeIdentifier> nodeIds, final String m
// Update headers to indicate the current revision so that we can
// prevent multiple users changing the flow at the same time
final Map<String, String> updatedHeaders = new HashMap<>(headers);
final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString());
final String requestId = computeRequestId(updatedHeaders);

long verifyClusterStateNanos = -1;
if (performVerification) {
Expand Down Expand Up @@ -458,7 +460,7 @@ AsyncClusterResponse replicate(final Set<NodeIdentifier> nodeIds, final String m

// instruct the node to actually perform the underlying action
if (mutableRequest && executionPhase) {
updatedHeaders.put(REQUEST_EXECUTION_HTTP_HEADER, "true");
updatedHeaders.put(RequestReplicationHeader.EXECUTION_CONTINUE.getHeader(), "true");
}

// replicate the request to all nodes
Expand Down Expand Up @@ -486,13 +488,16 @@ AsyncClusterResponse replicate(final Set<NodeIdentifier> nodeIds, final String m
}
}

private String computeRequestId(final Map<String, String> headers) {
return headers.computeIfAbsent(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader(), header -> UUID.randomUUID().toString());
}

private void performVerification(final Set<NodeIdentifier> nodeIds, final String method, final URI uri, final Object entity, final Map<String, String> headers,
final StandardAsyncClusterResponse clusterResponse, final boolean merge, final Object monitor) {
logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath());

final Map<String, String> validationHeaders = new HashMap<>(headers);
validationHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE);
validationHeaders.put(RequestReplicationHeader.VALIDATION_EXPECTS.getHeader(), NODE_CONTINUE);

final long startNanos = System.nanoTime();
final int numNodes = nodeIds.size();
Expand Down Expand Up @@ -523,7 +528,7 @@ public void onCompletion(final NodeResponse nodeResponse) {
clusterResponse.addTiming("Verification Completed", "All Nodes", nanos);

// Check if we have any requests that do not have a 202-Accepted status code.
final long dissentingCount = nodeResponses.stream().filter(p -> p.getStatus() != NODE_CONTINUE_STATUS_CODE).count();
final long dissentingCount = nodeResponses.stream().filter(p -> p.getStatus() != HttpURLConnection.HTTP_ACCEPTED).count();

// If all nodes responded with 202-Accepted, then we can replicate the original request
// to all nodes and we are finished.
Expand All @@ -535,7 +540,7 @@ public void onCompletion(final NodeResponse nodeResponse) {

try {
final Map<String, String> cancelLockHeaders = new HashMap<>(headers);
cancelLockHeaders.put(REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER, "true");
cancelLockHeaders.put(RequestReplicationHeader.CANCEL_TRANSACTION.getHeader(), "true");
final Thread cancelLockThread = new Thread(new Runnable() {
@Override
public void run() {
Expand All @@ -554,7 +559,7 @@ public void run() {
// Add a NodeResponse for each node to the Cluster Response
// Check that all nodes responded successfully.
for (final NodeResponse response : nodeResponses) {
if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) {
if (response.getStatus() != HttpURLConnection.HTTP_ACCEPTED) {
final Response clientResponse = response.getClientResponse();

final String message;
Expand Down Expand Up @@ -645,7 +650,7 @@ protected NodeResponse replicateRequest(final PreparedRequest request, final Nod
logger.debug("Replicating request to {} {}, request ID = {}, headers = {}", request.getMethod(), uri, requestId, request.getHeaders());

// invoke the request
response = httpClient.replicate(request, uri.toString());
response = httpClient.replicate(request, uri);

final long nanos = System.nanoTime() - startNanos;
clusterResponse.addTiming("Perform HTTP Request", nodeId.toString(), nanos);
Expand Down Expand Up @@ -866,7 +871,7 @@ public void run() {

try {
// create and send the request
final String requestId = request.getHeaders().get("x-nifi-request-id");
final String requestId = request.getHeaders().get(RequestReplicationHeader.REQUEST_TRANSACTION_ID.getHeader());
logger.debug("Replicating request {} {} to {}", method, uri.getPath(), nodeId);

nodeResponse = replicateRequest(request, nodeId, uri, requestId, clusterResponse);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.coordination.http.replication.client;

/**
* Enumeration of HTTP headers for preparing replicated requests
*/
enum PreparedRequestHeader {
ACCEPT_ENCODING("accept-encoding"),

CONTENT_ENCODING("content-encoding"),

CONTENT_LENGTH("content-length"),

CONTENT_TYPE("content-type"),

USER_AGENT("user-agent");

private final String header;

PreparedRequestHeader(final String header) {
this.header = header;
}

String getHeader() {
return header;
}
}
Loading

0 comments on commit 6e5a276

Please sign in to comment.