Skip to content

Commit

Permalink
[coro_rpc][log]use stream log (#853)
Browse files Browse the repository at this point in the history
* use stream log

* format
  • Loading branch information
qicosmos authored Dec 13, 2024
1 parent 0029f20 commit 5d1a580
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 103 deletions.
16 changes: 8 additions & 8 deletions include/ylt/coro_rpc/impl/common_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,36 +80,36 @@ inline bool init_ssl_context_helper(asio::ssl::context &context,
auto key_file = fs::path(conf.base_path).append(conf.key_file);
auto dh_file = fs::path(conf.base_path).append(conf.dh_file);

ELOGV(INFO, "current path %s", fs::current_path().string().data());
ELOG_INFO << "current path " << fs::current_path().string();
if (file_exists(cert_file)) {
ELOGV(INFO, "load %s", cert_file.string().data());
ELOG_INFO << "load " << cert_file.string();
context.use_certificate_chain_file(cert_file);
}
else {
ELOGV(ERROR, "no certificate file %s", cert_file.string().data());
ELOG_ERROR << "no certificate file " << cert_file.string();
return false;
}

if (file_exists(key_file)) {
ELOGV(INFO, "load %s", key_file.string().data());
ELOG_INFO << "load " << key_file.string();
context.use_private_key_file(key_file, asio::ssl::context::pem);
}
else {
ELOGV(ERROR, "no private key file %s", key_file.string().data());
ELOG_ERROR << "no private file " << key_file.string();
return false;
}

if (file_exists(dh_file)) {
ELOGV(INFO, "load %s", dh_file.string().data());
ELOG_INFO << "load " << dh_file.string();
context.use_tmp_dh_file(dh_file);
}
else {
ELOGV(INFO, "no temp dh file %s", dh_file.string().data());
ELOG_INFO << "no temp dh file " << dh_file.string();
}

return true;
} catch (std::exception &e) {
ELOGV(INFO, "%s", e.what());
ELOG_INFO << e.what();
return false;
}
}
Expand Down
4 changes: 2 additions & 2 deletions include/ylt/coro_rpc/impl/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ class context_base {
auto old_flag = self_->status_.exchange(context_status::start_response);
if (old_flag != context_status::init)
AS_UNLIKELY {
ELOGV(ERROR, "response message more than one time");
ELOG_ERROR << "response message more than one time";
return false;
}

if (self_->has_closed())
AS_UNLIKELY {
ELOGV(DEBUG, "response_msg failed: connection has been closed");
ELOG_DEBUG << "response_msg failed: connection has been closed";
return false;
}
return true;
Expand Down
73 changes: 32 additions & 41 deletions include/ylt/coro_rpc/impl/coro_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
~coro_connection() {
if (!has_closed_) {
#ifdef UNIT_TEST_INJECT
ELOGV(INFO, "~async_connection conn_id %d, client_id %d", conn_id_,
client_id_);
ELOG_INFO << "~async_connection conn_id " << conn_id_ << ", client_id "
<< client_id_;
#endif
close();
}
Expand All @@ -167,18 +167,18 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
#ifdef YLT_ENABLE_SSL
if (use_ssl_) {
assert(ssl_stream_);
ELOGV(INFO, "begin to handshake conn_id %d", conn_id_);
ELOG_INFO << "begin to handshake conn_id " << conn_id_;
reset_timer();
auto shake_ec = co_await coro_io::async_handshake(
ssl_stream_, asio::ssl::stream_base::server);
cancel_timer();
if (shake_ec) {
ELOGV(ERROR, "handshake failed: %s conn_id %d",
shake_ec.message().data(), conn_id_);
ELOG_ERROR << "handshake failed: " << shake_ec.message() << " conn_id "
<< conn_id_;
close();
}
else {
ELOGV(INFO, "handshake ok conn_id %d", conn_id_);
ELOG_INFO << "handshake ok conn_id " << conn_id_;
co_await start_impl<rpc_protocol>(router, *ssl_stream_);
}
}
Expand All @@ -204,22 +204,20 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
// less than RPC_HEAD_LEN. Incomplete data will be discarded.
// So, no special handling of eof is required.
if (ec) {
ELOGV(INFO, "connection %d close: %s", conn_id_, ec.message().data());
ELOG_INFO << "connection " << conn_id_ << " close: " << ec.message();
close();
break;
}

#ifdef UNIT_TEST_INJECT
client_id_ = req_head_tmp.seq_num;
ELOGV(INFO, "conn_id %d, client_id %d", conn_id_, client_id_);
ELOG_INFO << "conn_id " << conn_id_ << " client_id " << client_id_;
#endif

#ifdef UNIT_TEST_INJECT
if (g_action == inject_action::close_socket_after_read_header) {
ELOGV(WARN,
"inject action: close_socket_after_read_header, conn_id %d, "
"client_id %d",
conn_id_, client_id_);
ELOG_WARN << "inject action: close_socket_after_read_header, conn_id "
<< conn_id_ << ", client_id " << client_id_;
close();
break;
}
Expand Down Expand Up @@ -250,7 +248,7 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {

if (!serialize_proto.has_value())
AS_UNLIKELY {
ELOGV(ERROR, "bad serialize protocol type, conn_id %d", conn_id_);
ELOG_ERROR << "bad serialize protocol type, conn_id " << conn_id_;
close();
break;
}
Expand All @@ -265,8 +263,8 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {

if (ec)
AS_UNLIKELY {
ELOGV(ERROR, "read error: %s, conn_id %d", ec.message().data(),
conn_id_);
ELOG_ERROR << "read error: " << ec.message() << ", conn_id "
<< conn_id_;
close();
break;
}
Expand Down Expand Up @@ -317,20 +315,18 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
}
#ifdef UNIT_TEST_INJECT
if (g_action == inject_action::close_socket_after_send_length) {
ELOGV(WARN, "inject action: close_socket_after_send_length", conn_id_,
client_id_);
ELOG_WARN
<< "inject action: close_socket_after_send_length , conn_id "
<< conn_id_ << ", client_id " << client_id_;
std::string header_buf = rpc_protocol::prepare_response(
resp_buf, req_head, 0, resp_err, "");
co_await coro_io::async_write(socket, asio::buffer(header_buf));
close();
break;
}
if (g_action == inject_action::server_send_bad_rpc_result) {
ELOGV(
WARN,
"inject action: server_send_bad_rpc_result conn_id %d, client_id "
"%d",
conn_id_, client_id_);
ELOG_WARN << "inject action: server_send_bad_rpc_result , conn_id "
<< conn_id_ << ", client_id " << client_id_;
resp_buf[0] = resp_buf[0] + 1;
}
#endif
Expand Down Expand Up @@ -362,8 +358,7 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
if (resp_err) {
resp_error_msg = std::move(resp_buf);
resp_buf = {};
ELOGV(WARNING, "rpc route/execute error, error msg: %s",
resp_error_msg.data());
ELOG_WARN << "rpc route/execute error, error msg: " << resp_error_msg;
}
std::string header_buf = rpc_protocol::prepare_response(
resp_buf, req_head, attachment().length(), resp_err, resp_error_msg);
Expand Down Expand Up @@ -460,12 +455,12 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
rpc_conn self) noexcept {
if (has_closed())
AS_UNLIKELY {
ELOGV(DEBUG, "response_msg failed: connection has been closed");
ELOG_DEBUG << "response_msg failed: connection has been closed";
co_return;
}
#ifdef UNIT_TEST_INJECT
if (g_action == inject_action::close_socket_after_send_length) {
ELOGV(WARN, "inject action: close_socket_after_send_length");
ELOG_WARN << "inject action: close_socket_after_send_length";
body_buf.clear();
}
#endif
Expand All @@ -488,11 +483,9 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
auto &msg = write_queue_.front();
#ifdef UNIT_TEST_INJECT
if (g_action == inject_action::force_inject_connection_close_socket) {
ELOGV(
WARN,
"inject action: force_inject_connection_close_socket, conn_id %d, "
"client_id %d",
conn_id_, client_id_);
ELOG_WARN
<< "inject action: force_inject_connection_close_socket , conn_id "
<< conn_id_ << ", client_id " << client_id_;
close();
co_return;
}
Expand Down Expand Up @@ -535,19 +528,17 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
}
if (ret.first)
AS_UNLIKELY {
ELOGV(ERROR, "%s, %s", ret.first.message().data(),
"async_write error");
ELOG_ERROR << ret.first.message() << ", "
<< "async_write error";
close();
co_return;
}
write_queue_.pop_front();
}
#ifdef UNIT_TEST_INJECT
if (g_action == inject_action::close_socket_after_send_length) {
ELOGV(INFO,
"inject action: close_socket_after_send_length, conn_id %d, "
"client_id %d",
conn_id_, client_id_);
ELOG_INFO << "inject action: close_socket_after_send_length , conn_id "
<< conn_id_ << ", client_id " << client_id_;
// Attention: close ssl stream after read error
// otherwise, server will crash
close();
Expand All @@ -557,7 +548,7 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
}

void close() {
ELOGV(TRACE, "connection closed");
ELOG_TRACE << "connection closed";
if (has_closed_) {
return;
}
Expand All @@ -580,10 +571,10 @@ class coro_connection : public std::enable_shared_from_this<coro_connection> {
[this, self = shared_from_this()](asio::error_code const &ec) {
if (!ec) {
#ifdef UNIT_TEST_INJECT
ELOGV(INFO, "close timeout client_id %d conn_id %d", client_id_,
conn_id_);
ELOG_INFO << "close timeout client client_id " << client_id_
<< ", conn_id " << conn_id_;
#else
ELOGV(INFO, "close timeout client conn_id %d", conn_id_);
ELOG_INFO << "close timeout client conn_id " << conn_id_;
#endif

close();
Expand Down
41 changes: 20 additions & 21 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,13 @@ class coro_rpc_client {
uint32_t get_client_id() const { return config_.client_id; }

void close() {
// ELOGV(INFO, "client_id %d close", config_.client_id);
// ELOG_INFO << "client_id " << config_.client_id << " close";
close_socket(control_);
}

bool set_req_attachment(std::string_view attachment) {
if (attachment.size() > UINT32_MAX) {
ELOGV(ERROR, "too large rpc attachment");
ELOG_ERROR << "too large rpc attachment";
return false;
}
req_attachment_ = attachment;
Expand Down Expand Up @@ -399,8 +399,8 @@ class coro_rpc_client {
#endif
control_->has_closed_ = false;

ELOGV(INFO, "client_id %d begin to connect %s", config_.client_id,
config_.port.data());
ELOG_INFO << "client_id " << config_.client_id << " begin to connect "
<< config_.port;
auto conn_timeout_dur = *config_.connect_timeout_duration;
if (conn_timeout_dur.count() >= 0) {
timeout(*this->timer_, conn_timeout_dur, "connect timer canceled")
Expand All @@ -421,7 +421,7 @@ class coro_rpc_client {
}

if (control_->is_timeout_) {
ELOGV(WARN, "client_id %d connect timeout", config_.client_id);
ELOG_WARN << "client_id " << config_.client_id << " connect timeout";
co_return errc::timed_out;
}
if (config_.enable_tcp_no_delay == true) {
Expand All @@ -434,8 +434,8 @@ class coro_rpc_client {
auto shake_ec = co_await coro_io::async_handshake(
control_->ssl_stream_, asio::ssl::stream_base::client);
if (shake_ec) {
ELOGV(WARN, "client_id %d handshake failed: %s", config_.client_id,
shake_ec.message().data());
ELOG_WARN << "client_id " << config_.client_id
<< " handshake failed: " << shake_ec.message();
co_return errc::not_connected;
}
}
Expand All @@ -447,16 +447,15 @@ class coro_rpc_client {
[[nodiscard]] bool init_ssl_impl() {
try {
ssl_init_ret_ = false;
ELOGV(INFO, "init ssl: %s", config_.ssl_domain.data());
ELOG_INFO << "init ssl: " << config_.ssl_domain;
auto &cert_file = config_.ssl_cert_path;
ELOGV(INFO, "current path %s",
std::filesystem::current_path().string().data());
ELOG_INFO << "current path: " << std::filesystem::current_path().string();
if (file_exists(cert_file)) {
ELOGV(INFO, "load %s", cert_file.string().data());
ELOG_INFO << "load " << cert_file.string();
ssl_ctx_.load_verify_file(cert_file);
}
else {
ELOGV(INFO, "no certificate file %s", cert_file.string().data());
ELOG_INFO << "no certificate file " << cert_file.string();
return ssl_init_ret_;
}
ssl_ctx_.set_verify_mode(asio::ssl::verify_peer);
Expand All @@ -467,7 +466,7 @@ class coro_rpc_client {
control_->socket_, ssl_ctx_);
ssl_init_ret_ = true;
} catch (std::exception &e) {
ELOGV(ERROR, "init ssl failed: %s", e.what());
ELOG_ERROR << "init ssl failed: " << e.what();
}
return ssl_init_ret_;
}
Expand Down Expand Up @@ -573,7 +572,7 @@ class coro_rpc_client {
#endif
auto sz = buffer.size() - coro_rpc_protocol::REQ_HEAD_LEN;
if (sz > UINT32_MAX) {
ELOGV(ERROR, "too large rpc body");
ELOG_ERROR << "too large rpc body";
return {};
}
header.length = sz;
Expand Down Expand Up @@ -620,7 +619,7 @@ class coro_rpc_client {
}
has_error = true;
// deserialize failed.
ELOGV(WARNING, "deserilaize rpc result failed");
ELOG_WARN << "deserilaize rpc result failed";
err = {errc::invalid_rpc_result, "failed to deserialize rpc return value"};
return rpc_result<T>{unexpect_t{}, std::move(err)};
}
Expand Down Expand Up @@ -747,7 +746,7 @@ class coro_rpc_client {

if (control_->has_closed_)
AS_UNLIKELY {
ELOGV(ERROR, "client has been closed, please re-connect");
ELOG_ERROR << "client has been closed, please re-connect";
co_return rpc_error{errc::io_error,
"client has been closed, please re-connect"};
}
Expand Down Expand Up @@ -1040,7 +1039,7 @@ class coro_rpc_client {
if (g_action == inject_action::client_close_socket_after_send_header) {
ret = co_await coro_io::async_write(
socket, asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN));
ELOGV(INFO, "client_id %d close socket", config_.client_id);
ELOG_INFO << "client_id " << config_.client_id << " close socket";
close();
co_return rpc_error{errc::io_error, ret.first.message()};
}
Expand All @@ -1049,15 +1048,15 @@ class coro_rpc_client {
ret = co_await coro_io::async_write(
socket,
asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN - 1));
ELOGV(INFO, "client_id %d close socket", config_.client_id);
ELOG_INFO << "client_id " << config_.client_id << " close socket";
close();
co_return rpc_error{errc::io_error, ret.first.message()};
}
else if (g_action ==
inject_action::client_shutdown_socket_after_send_header) {
ret = co_await coro_io::async_write(
socket, asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN));
ELOGV(INFO, "client_id %d shutdown", config_.client_id);
ELOG_INFO << "client_id " << config_.client_id << " shutdown";
control_->socket_.shutdown(asio::ip::tcp::socket::shutdown_send);
co_return rpc_error{errc::io_error, ret.first.message()};
}
Expand Down Expand Up @@ -1105,8 +1104,8 @@ class coro_rpc_client {
#endif
#ifdef UNIT_TEST_INJECT
if (g_action == inject_action::client_close_socket_after_send_payload) {
ELOGV(INFO, "client_id %d client_close_socket_after_send_payload",
config_.client_id);
ELOG_INFO << "client_id " << config_.client_id
<< " client_close_socket_after_send_payload";
close();
co_return rpc_error{errc::io_error, ret.first.message()};
}
Expand Down
Loading

0 comments on commit 5d1a580

Please sign in to comment.