Skip to content
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ constexpr absl::string_view ResponseTrailerLatencyUsField = "response_trailer_la
constexpr absl::string_view ResponseTrailerCallStatusField = "response_trailer_call_status";
constexpr absl::string_view BytesSentField = "bytes_sent";
constexpr absl::string_view BytesReceivedField = "bytes_received";
constexpr absl::string_view ImmediateResponseField = "immediate_response";
constexpr absl::string_view RequestHeaderContinueAndReplaceField =
"request_header_continue_and_replace";
constexpr absl::string_view ResponseHeaderContinueAndReplaceField =
"response_header_continue_and_replace";

absl::optional<ProcessingMode> initProcessingMode(const ExtProcPerRoute& config) {
if (!config.disabled() && config.has_overrides() && config.overrides().has_processing_mode()) {
Expand Down Expand Up @@ -277,17 +282,21 @@ FilterConfig::FilterConfig(const ExternalProcessor& config,
[](Envoy::Event::Dispatcher&) { return std::make_shared<ThreadLocalStreamManager>(); });
}

void ExtProcLoggingInfo::recordGrpcCall(
std::chrono::microseconds latency, Grpc::Status::GrpcStatus call_status,
ProcessorState::CallbackState callback_state,
envoy::config::core::v3::TrafficDirection traffic_direction) {
void ExtProcLoggingInfo::recordGrpcCall(std::chrono::microseconds latency,
Grpc::Status::GrpcStatus call_status,
ProcessorState::CallbackState callback_state,
envoy::config::core::v3::TrafficDirection traffic_direction,
bool continue_and_replace) {
ASSERT(callback_state != ProcessorState::CallbackState::Idle);

// Record the gRPC call stats for the header.
if (callback_state == ProcessorState::CallbackState::HeadersCallback) {
if (grpcCalls(traffic_direction).header_stats_ == nullptr) {
grpcCalls(traffic_direction).header_stats_ = std::make_unique<GrpcCall>(latency, call_status);
}
if (continue_and_replace) {
grpcCalls(traffic_direction).continue_and_replace_ = true;
}
return;
}

Expand Down Expand Up @@ -391,6 +400,11 @@ ProtobufTypes::MessagePtr ExtProcLoggingInfo::serializeAsProto() const {
static_cast<double>(bytes_sent_));
(*struct_msg->mutable_fields())[BytesReceivedField].set_number_value(
static_cast<double>(bytes_received_));
(*struct_msg->mutable_fields())[ImmediateResponseField].set_bool_value(immediate_response_);
(*struct_msg->mutable_fields())[RequestHeaderContinueAndReplaceField].set_bool_value(
decoding_processor_grpc_calls_.continue_and_replace_);
(*struct_msg->mutable_fields())[ResponseHeaderContinueAndReplaceField].set_bool_value(
encoding_processor_grpc_calls_.continue_and_replace_);
return struct_msg;
}

Expand Down Expand Up @@ -495,6 +509,15 @@ ExtProcLoggingInfo::getField(absl::string_view field_name) const {
if (field_name == BytesReceivedField) {
return static_cast<int64_t>(bytes_received_);
}
if (field_name == ImmediateResponseField) {
return bool(immediate_response_);
}
if (field_name == RequestHeaderContinueAndReplaceField) {
return bool(decoding_processor_grpc_calls_.continue_and_replace_);
}
if (field_name == ResponseHeaderContinueAndReplaceField) {
return bool(encoding_processor_grpc_calls_.continue_and_replace_);
}
return {};
}

Expand Down Expand Up @@ -1790,6 +1813,9 @@ void Filter::sendImmediateResponse(const ImmediateResponse& response) {
};

sent_immediate_response_ = true;
if (logging_info_ != nullptr) {
logging_info_->setImmediateResponse();
}
ENVOY_STREAM_LOG(debug, "Sending local reply with status code {}", *decoder_callbacks_,
status_code);
const auto details = StringUtil::replaceAllEmptySpace(response.details());
Expand Down
11 changes: 8 additions & 3 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,16 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {
std::unique_ptr<GrpcCall> header_stats_;
std::unique_ptr<GrpcCall> trailer_stats_;
std::unique_ptr<GrpcCallBody> body_stats_;
bool continue_and_replace_;
};

using GrpcCalls = struct GrpcCallStats;

void recordGrpcCall(std::chrono::microseconds latency, Grpc::Status::GrpcStatus call_status,
ProcessorState::CallbackState callback_state,
envoy::config::core::v3::TrafficDirection traffic_direction);
envoy::config::core::v3::TrafficDirection traffic_direction,
bool continue_and_replace = false);
void setImmediateResponse() { immediate_response_ = true; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change this into setImmediateResponse(bool immediate_response) {immediate_response_ = immediate_response; } ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I like it it the way it is. Since the default value is false when the variable is created ext_proc.h L 151 (https://github.com/envoyproxy/envoy/pull/41602/files#diff-d95000cdea207e2cf306d30b1706750b57a8228c25f01b0f8b4df0f22bf033c7R151) I think it makes more sense that "set" implies you are changing the value to true.

However, this is not a very strong opinion and I am happy to make the change if you feel strongly it should take in a variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thought do you ever need to call setImmediateResponse() to set it to false. Small thing, up to you

void setBytesSent(uint64_t bytes_sent) { bytes_sent_ = bytes_sent; }
void setBytesReceived(uint64_t bytes_received) { bytes_received_ = bytes_received; }
void setClusterInfo(absl::optional<Upstream::ClusterInfoConstSharedPtr> cluster_info) {
Expand Down Expand Up @@ -134,8 +137,8 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {

private:
GrpcCalls& grpcCalls(envoy::config::core::v3::TrafficDirection traffic_direction);
GrpcCalls decoding_processor_grpc_calls_;
GrpcCalls encoding_processor_grpc_calls_;
GrpcCalls decoding_processor_grpc_calls_{};
GrpcCalls encoding_processor_grpc_calls_{};
const Envoy::Protobuf::Struct filter_metadata_;
// The following stats are populated for ext_proc filters using Envoy gRPC only.
// The bytes sent and received are for the entire stream.
Expand All @@ -144,6 +147,8 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {
Upstream::HostDescriptionConstSharedPtr upstream_host_;
// The status details of the underlying HTTP/2 stream. Envoy gRPC only.
std::string http_response_code_details_;
// True if an immediate response is sent.
bool immediate_response_{false};
};

class ThreadLocalStreamManager;
Expand Down
9 changes: 6 additions & 3 deletions source/extensions/filters/http/ext_proc/processor_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void ProcessorState::onStartProcessorCall(Event::TimerCb cb, std::chrono::millis
}

void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status,
CallbackState next_state) {
CallbackState next_state, bool continue_and_replace) {
ENVOY_STREAM_LOG(debug, "Finish external processing call", *filter_callbacks_);
filter_.logStreamInfo();

Expand All @@ -49,7 +49,8 @@ void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status,
filter_callbacks_->dispatcher().timeSource().monotonicTime() - call_start_time_.value());
ExtProcLoggingInfo* logging_info = filter_.loggingInfo();
if (logging_info != nullptr) {
logging_info->recordGrpcCall(duration, call_status, callback_state_, trafficDirection());
logging_info->recordGrpcCall(duration, call_status, callback_state_, trafficDirection(),
continue_and_replace);
}
call_start_time_ = absl::nullopt;
}
Expand Down Expand Up @@ -165,10 +166,12 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon
}

clearRouteCache(common_response);
onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response));

if (common_response.status() == CommonResponse::CONTINUE_AND_REPLACE) {
onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response), true);
return handleHeaderContinueAndReplace(response);
} else {
onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response));
}

filter_.onProcessHeadersResponse(response, absl::OkStatus(), trafficDirection());
Expand Down
3 changes: 2 additions & 1 deletion source/extensions/filters/http/ext_proc/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
void onStartProcessorCall(Event::TimerCb cb, std::chrono::milliseconds timeout,
CallbackState callback_state);
void onFinishProcessorCall(Grpc::Status::GrpcStatus call_status,
CallbackState next_state = CallbackState::Idle);
CallbackState next_state = CallbackState::Idle,
bool continue_and_replace = false);
void stopMessageTimer();
bool restartMessageTimer(const uint32_t message_timeout_ms);

Expand Down
1 change: 1 addition & 0 deletions test/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ envoy_extension_cc_test_library(
"//test/test_common:utility_lib",
"@com_google_absl//absl/strings",
"@envoy_api//envoy/config/route/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/access_loggers/file/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/http/set_metadata/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/network/http_connection_manager/v3:pkg_cc_proto",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <chrono>

#include "envoy/extensions/access_loggers/file/v3/file.pb.h"
#include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h"
#include "envoy/extensions/filters/http/set_metadata/v3/set_metadata.pb.h"
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
Expand Down Expand Up @@ -874,6 +875,72 @@ void ExtProcIntegrationTest::prependExprocCompositeFilter() {
true);
}

void ExtProcIntegrationTest::initializeLogConfig(std::string& access_log_path) {
config_helper_.addConfigModifier([&](ConfigHelper::HttpConnectionManager& cm) {
auto* access_log = cm.add_access_log();
access_log->set_name("accesslog");
envoy::extensions::access_loggers::file::v3::FileAccessLog access_log_config;
access_log_config.set_path(access_log_path);
auto* json_format = access_log_config.mutable_log_format()->mutable_json_format();

// Test all three serialization modes.
(*json_format->mutable_fields())["ext_proc_plain"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:PLAIN)%");
(*json_format->mutable_fields())["ext_proc_typed"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:TYPED)%");

// Test field extraction for coverage.
(*json_format->mutable_fields())["field_request_header_latency"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_header_latency_us)%");
(*json_format->mutable_fields())["field_request_header_status"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_header_call_status)%");
(*json_format->mutable_fields())["field_request_body_calls"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_body_call_count)%");
(*json_format->mutable_fields())["field_request_body_total_latency"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_body_total_latency_us)%");
(*json_format->mutable_fields())["field_request_body_max_latency"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_body_max_latency_us)%");
(*json_format->mutable_fields())["field_request_body_last_status"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_body_last_call_status)%");
(*json_format->mutable_fields())["field_request_trailer_latency"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_trailer_latency_us)%");
(*json_format->mutable_fields())["field_request_trailer_status"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_trailer_call_status)%");
(*json_format->mutable_fields())["field_response_header_latency"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:response_header_latency_us)%");
(*json_format->mutable_fields())["field_response_header_status"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:response_header_call_status)%");
(*json_format->mutable_fields())["field_response_body_calls"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:response_body_call_count)%");
(*json_format->mutable_fields())["field_response_body_total_latency"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:response_body_total_latency_us)%");
(*json_format->mutable_fields())["field_response_body_max_latency"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:response_body_max_latency_us)%");
(*json_format->mutable_fields())["field_response_body_last_status"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:response_body_last_call_status)%");
(*json_format->mutable_fields())["field_response_trailer_latency"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:response_trailer_latency_us)%");
(*json_format->mutable_fields())["field_response_trailer_status"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:response_trailer_call_status)%");
(*json_format->mutable_fields())["field_bytes_sent"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:bytes_sent)%");
(*json_format->mutable_fields())["field_bytes_received"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:bytes_received)%");
(*json_format->mutable_fields())["field_request_header_cr"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:request_header_continue_and_replace)%");
(*json_format->mutable_fields())["field_response_header_cr"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:response_header_continue_and_replace)%");
(*json_format->mutable_fields())["field_immeidate_response"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:immediate_response)%");

// Test non-existent field for coverage
(*json_format->mutable_fields())["field_non_existent"].set_string_value(
"%FILTER_STATE(envoy.filters.http.ext_proc:FIELD:non_existent_field)%");

access_log->mutable_typed_config()->PackFrom(access_log_config);
});
}

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class ExtProcIntegrationTest : public HttpIntegrationTest,
bool response = false, absl::string_view body_sent = "");
void serverSendTrailerRespDuplexStreamed();
void prependExprocCompositeFilter();
void initializeLogConfig(std::string& access_log_path);

std::unique_ptr<SimpleFilterConfig<DynamicMetadataToHeadersFilter>> simple_filter_config_;
std::unique_ptr<
Expand Down
Loading