Skip to content

Commit

Permalink
Fix grpc transcoding bugs for unknown fields (#797)
Browse files Browse the repository at this point in the history
* Fix grpc transcoding bugs for unknown fields

Signed-off-by: Wayne Zhang <[email protected]>

* fix format

Signed-off-by: Wayne Zhang <[email protected]>

* Add a t test for Skip bug

Signed-off-by: Wayne Zhang <[email protected]>

* fix t test

Signed-off-by: Wayne Zhang <[email protected]>

* fix format

Signed-off-by: Wayne Zhang <[email protected]>

* Replace hard-coded 5 with kFrameSize

Signed-off-by: Wayne Zhang <[email protected]>
  • Loading branch information
qiwzhang authored May 18, 2020
1 parent b33a63d commit b840c62
Show file tree
Hide file tree
Showing 9 changed files with 345 additions and 335 deletions.
40 changes: 37 additions & 3 deletions src/grpc/zero_copy_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,29 @@ GrpcZeroCopyInputStream::GrpcZeroCopyInputStream()
: current_buffer_(nullptr),
current_buffer_size_(0),
position_(0),
bytes_read_(0),
finished_(false) {}

void GrpcZeroCopyInputStream::AddMessage(grpc_byte_buffer* message,
void GrpcZeroCopyInputStream::AddMessage(grpc_byte_buffer *message,
bool take_ownership) {
serializer_.AddMessage(message, take_ownership);
}

bool GrpcZeroCopyInputStream::Next(const void** data, int* size) {
bool GrpcZeroCopyInputStream::Next(const void **data, int *size) {
if (position_ >= current_buffer_size_) {
position_ = 0;
if (!serializer_.Next(&current_buffer_, &current_buffer_size_)) {
// No data
*size = 0;
current_buffer_size_ = 0;
return !finished_;
}
position_ = 0;
}

// Return [position_, current_buffer_size_) interval of the current buffer
*data = current_buffer_ + position_;
*size = static_cast<int>(current_buffer_size_ - position_);
bytes_read_ += *size;

// Move the position to the end of the current buffer
position_ = current_buffer_size_;
Expand All @@ -65,9 +68,40 @@ bool GrpcZeroCopyInputStream::Next(const void** data, int* size) {
void GrpcZeroCopyInputStream::BackUp(int count) {
if (0 < count && static_cast<size_t>(count) <= position_) {
position_ -= count;
bytes_read_ -= count;
}
}

bool GrpcZeroCopyInputStream::Skip(int count) {
if (count < 0) {
// Safe guard against wrong usage.
return false;
}
size_t count_left = static_cast<size_t>(count);
while (position_ + count_left > current_buffer_size_) {
// Skipping past the current buffer, read the next one.
int delta = static_cast<int>(current_buffer_size_ - position_);
count_left -= delta;
bytes_read_ += delta;
position_ = 0;
if (!serializer_.Next(&current_buffer_, &current_buffer_size_)) {
// No data. We are potentially not at the end of the stream yet, but we
// don't know that and can only skip to the end and return an error.
current_buffer_size_ = 0;
return false;
}
}

// Move the position ahead the requested number of bytes.
position_ += count_left;
bytes_read_ += count_left;
return true;
}

::google::protobuf::int64 GrpcZeroCopyInputStream::ByteCount() const {
return static_cast<::google::protobuf::int64>(bytes_read_);
}

int64_t GrpcZeroCopyInputStream::BytesAvailable() const {
return (current_buffer_size_ - position_) + serializer_.ByteCount();
}
Expand Down
11 changes: 6 additions & 5 deletions src/grpc/zero_copy_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,27 @@ class GrpcZeroCopyInputStream
GrpcZeroCopyInputStream();

// Add a message to the end of the stream
void AddMessage(grpc_byte_buffer* message, bool take_ownership);
void AddMessage(grpc_byte_buffer *message, bool take_ownership);

// Marks the end of the stream, which means that ZeroCopyInputStream will
// return false after all the existing messages are consumed.
void Finish() { finished_ = true; }

// ZeroCopyInputStream implementation

bool Next(const void** data, int* size);
bool Next(const void **data, int *size);
void BackUp(int count);
bool Skip(int count) { return false; } // not supported
::google::protobuf::int64 ByteCount() const { return 0; } // Not implemented
bool Skip(int count);
::google::protobuf::int64 ByteCount() const;
int64_t BytesAvailable() const;
bool Finished() const { return finished_; }

private:
GrpcMessageSerializer serializer_;
const unsigned char* current_buffer_;
const unsigned char *current_buffer_;
size_t current_buffer_size_;
size_t position_;
size_t bytes_read_;
bool finished_;
};

Expand Down
Loading

0 comments on commit b840c62

Please sign in to comment.