Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ constexpr absl::string_view ResponseTrailerLatencyUsField = "response_trailer_la
constexpr absl::string_view ResponseTrailerCallStatusField = "response_trailer_call_status";
constexpr absl::string_view BytesSentField = "bytes_sent";
constexpr absl::string_view BytesReceivedField = "bytes_received";
constexpr absl::string_view ImmediateResponseField = "immediate_response";
constexpr absl::string_view RequestHeaderContinueAndReplaceField =
"request_header_continue_and_replace";
constexpr absl::string_view ResponseHeaderContinueAndReplaceField =
"response_header_continue_and_replace";
constexpr absl::string_view FailedOpenField = "failed_open";
constexpr absl::string_view ServerHalfClosedField = "server_half_closed";

absl::optional<ProcessingMode> initProcessingMode(const ExtProcPerRoute& config) {
if (!config.disabled() && config.has_overrides() && config.overrides().has_processing_mode()) {
Expand Down Expand Up @@ -277,17 +284,21 @@ FilterConfig::FilterConfig(const ExternalProcessor& config,
[](Envoy::Event::Dispatcher&) { return std::make_shared<ThreadLocalStreamManager>(); });
}

void ExtProcLoggingInfo::recordGrpcCall(
std::chrono::microseconds latency, Grpc::Status::GrpcStatus call_status,
ProcessorState::CallbackState callback_state,
envoy::config::core::v3::TrafficDirection traffic_direction) {
void ExtProcLoggingInfo::recordGrpcCall(std::chrono::microseconds latency,
Grpc::Status::GrpcStatus call_status,
ProcessorState::CallbackState callback_state,
envoy::config::core::v3::TrafficDirection traffic_direction,
bool continue_and_replace) {
ASSERT(callback_state != ProcessorState::CallbackState::Idle);

// Record the gRPC call stats for the header.
if (callback_state == ProcessorState::CallbackState::HeadersCallback) {
if (grpcCalls(traffic_direction).header_stats_ == nullptr) {
grpcCalls(traffic_direction).header_stats_ = std::make_unique<GrpcCall>(latency, call_status);
}
if (continue_and_replace) {
grpcCalls(traffic_direction).continue_and_replace_ = true;
}
return;
}

Expand Down Expand Up @@ -391,6 +402,13 @@ ProtobufTypes::MessagePtr ExtProcLoggingInfo::serializeAsProto() const {
static_cast<double>(bytes_sent_));
(*struct_msg->mutable_fields())[BytesReceivedField].set_number_value(
static_cast<double>(bytes_received_));
(*struct_msg->mutable_fields())[ImmediateResponseField].set_bool_value(immediate_response_);
(*struct_msg->mutable_fields())[RequestHeaderContinueAndReplaceField].set_bool_value(
decoding_processor_grpc_calls_.continue_and_replace_);
(*struct_msg->mutable_fields())[ResponseHeaderContinueAndReplaceField].set_bool_value(
encoding_processor_grpc_calls_.continue_and_replace_);
(*struct_msg->mutable_fields())[FailedOpenField].set_bool_value(failed_open_);
(*struct_msg->mutable_fields())[ServerHalfClosedField].set_bool_value(server_half_closed_);
return struct_msg;
}

Expand Down Expand Up @@ -495,6 +513,21 @@ ExtProcLoggingInfo::getField(absl::string_view field_name) const {
if (field_name == BytesReceivedField) {
return static_cast<int64_t>(bytes_received_);
}
if (field_name == ImmediateResponseField) {
return bool(immediate_response_);
}
if (field_name == RequestHeaderContinueAndReplaceField) {
return bool(decoding_processor_grpc_calls_.continue_and_replace_);
}
if (field_name == ResponseHeaderContinueAndReplaceField) {
return bool(encoding_processor_grpc_calls_.continue_and_replace_);
}
if (field_name == FailedOpenField) {
return bool(failed_open_);
}
if (field_name == ServerHalfClosedField) {
return bool(server_half_closed_);
}
return {};
}

Expand Down Expand Up @@ -598,6 +631,7 @@ void Filter::onError() {
// Close the external processing.
processing_complete_ = true;
stats_.failure_mode_allowed_.inc();
logging_info_->setFailedOpen();
clearAsyncState();
} else {
// Return an error and stop processing the current stream.
Expand Down Expand Up @@ -1669,6 +1703,7 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
// ignore it and also ignore the stream for the rest of this filter
// instance's lifetime to protect us from a malformed server.
stats_.failure_mode_allowed_.inc();
logging_info_->setFailedOpen();
closeStream();
clearAsyncState(processing_status.raw_code());
processing_complete_ = true;
Expand Down Expand Up @@ -1696,7 +1731,7 @@ void Filter::onGrpcError(Grpc::Status::GrpcStatus status, const std::string& mes
if (failureModeAllow()) {
onGrpcCloseWithStatus(status);
stats_.failure_mode_allowed_.inc();

logging_info_->setFailedOpen();
} else {
processing_complete_ = true;
// Since the stream failed, there is no need to handle timeouts, so
Expand All @@ -1719,6 +1754,7 @@ void Filter::onGrpcCloseWithStatus(Grpc::Status::GrpcStatus status) {

processing_complete_ = true;
stats_.streams_closed_.inc();
logging_info_->setServerHalfClose();
// Successful close. We can ignore the stream for the rest of our request
// and response processing.
closeStream();
Expand All @@ -1737,6 +1773,7 @@ void Filter::onMessageTimeout() {
processing_complete_ = true;
closeStream();
stats_.failure_mode_allowed_.inc();
logging_info_->setFailedOpen();
clearAsyncState(Grpc::Status::DeadlineExceeded);

} else {
Expand Down Expand Up @@ -1790,6 +1827,9 @@ void Filter::sendImmediateResponse(const ImmediateResponse& response) {
};

sent_immediate_response_ = true;
if (logging_info_ != nullptr) {
logging_info_->setImmediateResponse();
}
ENVOY_STREAM_LOG(debug, "Sending local reply with status code {}", *decoder_callbacks_,
status_code);
const auto details = StringUtil::replaceAllEmptySpace(response.details());
Expand Down
17 changes: 14 additions & 3 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,18 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {
std::unique_ptr<GrpcCall> header_stats_;
std::unique_ptr<GrpcCall> trailer_stats_;
std::unique_ptr<GrpcCallBody> body_stats_;
bool continue_and_replace_;
};

using GrpcCalls = struct GrpcCallStats;

void recordGrpcCall(std::chrono::microseconds latency, Grpc::Status::GrpcStatus call_status,
ProcessorState::CallbackState callback_state,
envoy::config::core::v3::TrafficDirection traffic_direction);
envoy::config::core::v3::TrafficDirection traffic_direction,
bool continue_and_replace = false);
void setImmediateResponse() { immediate_response_ = true; }
void setFailedOpen() { failed_open_ = true; }
void setServerHalfClose() { server_half_closed_ = true; }
void setBytesSent(uint64_t bytes_sent) { bytes_sent_ = bytes_sent; }
void setBytesReceived(uint64_t bytes_received) { bytes_received_ = bytes_received; }
void setClusterInfo(absl::optional<Upstream::ClusterInfoConstSharedPtr> cluster_info) {
Expand Down Expand Up @@ -134,8 +139,8 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {

private:
GrpcCalls& grpcCalls(envoy::config::core::v3::TrafficDirection traffic_direction);
GrpcCalls decoding_processor_grpc_calls_;
GrpcCalls encoding_processor_grpc_calls_;
GrpcCalls decoding_processor_grpc_calls_{};
GrpcCalls encoding_processor_grpc_calls_{};
const Envoy::Protobuf::Struct filter_metadata_;
// The following stats are populated for ext_proc filters using Envoy gRPC only.
// The bytes sent and received are for the entire stream.
Expand All @@ -144,6 +149,12 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {
Upstream::HostDescriptionConstSharedPtr upstream_host_;
// The status details of the underlying HTTP/2 stream. Envoy gRPC only.
std::string http_response_code_details_;
// True if an immediate response is sent.
bool immediate_response_{false};
// True if the stream failed open.
bool failed_open_{false};
// True if the external processing server closed the stream before the client.
bool server_half_closed_{false};
};

class ThreadLocalStreamManager;
Expand Down
9 changes: 6 additions & 3 deletions source/extensions/filters/http/ext_proc/processor_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void ProcessorState::onStartProcessorCall(Event::TimerCb cb, std::chrono::millis
}

void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status,
CallbackState next_state) {
CallbackState next_state, bool continue_and_replace) {
ENVOY_STREAM_LOG(debug, "Finish external processing call", *filter_callbacks_);
filter_.logStreamInfo();

Expand All @@ -49,7 +49,8 @@ void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status,
filter_callbacks_->dispatcher().timeSource().monotonicTime() - call_start_time_.value());
ExtProcLoggingInfo* logging_info = filter_.loggingInfo();
if (logging_info != nullptr) {
logging_info->recordGrpcCall(duration, call_status, callback_state_, trafficDirection());
logging_info->recordGrpcCall(duration, call_status, callback_state_, trafficDirection(),
continue_and_replace);
}
call_start_time_ = absl::nullopt;
}
Expand Down Expand Up @@ -165,10 +166,12 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon
}

clearRouteCache(common_response);
onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response));

if (common_response.status() == CommonResponse::CONTINUE_AND_REPLACE) {
onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response), true);
return handleHeaderContinueAndReplace(response);
} else {
onFinishProcessorCall(Grpc::Status::Ok, getCallbackStateAfterHeaderResp(common_response));
}

filter_.onProcessHeadersResponse(response, absl::OkStatus(), trafficDirection());
Expand Down
3 changes: 2 additions & 1 deletion source/extensions/filters/http/ext_proc/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
void onStartProcessorCall(Event::TimerCb cb, std::chrono::milliseconds timeout,
CallbackState callback_state);
void onFinishProcessorCall(Grpc::Status::GrpcStatus call_status,
CallbackState next_state = CallbackState::Idle);
CallbackState next_state = CallbackState::Idle,
bool continue_and_replace = false);
void stopMessageTimer();
bool restartMessageTimer(const uint32_t message_timeout_ms);

Expand Down
Loading