-
Couldn't load subscription status.
- Fork 597
Add unit tests for JsonRpcConnection connection handling
#10568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
d82b7e4
5fa68e6
4bd51fe
5f3a8c6
f0e8191
51d2a85
3468bb1
20ea6d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,6 @@ on: | |
| push: | ||
| branches: | ||
| - master | ||
| pull_request: {} | ||
| release: | ||
| types: | ||
| - published | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,7 +38,7 @@ JsonRpcConnection::JsonRpcConnection(const WaitGroup::Ptr& waitGroup, const Stri | |
| JsonRpcConnection::JsonRpcConnection(const WaitGroup::Ptr& waitGroup, const String& identity, bool authenticated, | ||
| const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io) | ||
| : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), | ||
| m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_IoStrand(io), | ||
| m_Timestamp(Utility::GetTime()), m_Seen(std::chrono::steady_clock::now()), m_IoStrand(io), | ||
| m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false), m_WaitGroup(waitGroup), | ||
| m_CheckLivenessTimer(io), m_HeartbeatTimer(io) | ||
| { | ||
|
|
@@ -81,7 +81,7 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) | |
| break; | ||
| } | ||
|
|
||
| m_Seen = Utility::GetTime(); | ||
| m_Seen = std::chrono::steady_clock::now(); | ||
| if (m_Endpoint) { | ||
| m_Endpoint->AddMessageReceived(jsonString.GetLength()); | ||
| } | ||
|
|
@@ -236,6 +236,11 @@ void JsonRpcConnection::SendRawMessage(const String& message) | |
| }); | ||
| } | ||
|
|
||
| void JsonRpcConnection::SetLivenessTimeout(std::chrono::milliseconds timeout) | ||
| { | ||
| m_LivenessTimeout = timeout; | ||
| } | ||
|
|
||
| void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message) | ||
| { | ||
| if (m_ShuttingDown) { | ||
|
|
@@ -411,31 +416,53 @@ void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc) | |
| * leaking the connection. Therefore close it after a timeout. | ||
| */ | ||
|
|
||
| m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(10)); | ||
| auto anonymousTimeout = m_LivenessTimeout / 6; | ||
| m_CheckLivenessTimer.expires_after(anonymousTimeout); | ||
| m_CheckLivenessTimer.async_wait(yc[ec]); | ||
| if (ec) { | ||
| Log(LogCritical, "JsonRpcConnection") << "Error waiting for Liveness timer: " << ec.message(); | ||
| } | ||
|
|
||
| if (m_ShuttingDown) { | ||
| return; | ||
| } | ||
|
|
||
| auto remote (m_Stream->lowest_layer().remote_endpoint()); | ||
| { | ||
| auto remote(m_Stream->lowest_layer().remote_endpoint()); | ||
|
|
||
| Log(LogInformation, "JsonRpcConnection") | ||
| << "Closing anonymous connection [" << remote.address() << "]:" << remote.port() << " after 10 seconds."; | ||
| auto msg = Log(LogInformation, "JsonRpcConnection"); | ||
| msg << "Closing anonymous connection [" << remote.address() << "]:" << remote.port() << " after "; | ||
| if (anonymousTimeout > 1s) { | ||
| msg << anonymousTimeout.count() / 1000 << " seconds."; | ||
| } else { | ||
| msg << anonymousTimeout.count() << " milliseconds"; | ||
| } | ||
| } | ||
|
|
||
| Disconnect(); | ||
| } else { | ||
| for (;;) { | ||
| m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(30)); | ||
| m_CheckLivenessTimer.expires_after(m_LivenessTimeout / 2); | ||
| m_CheckLivenessTimer.async_wait(yc[ec]); | ||
| if (ec) { | ||
| Log(LogCritical, "JsonRpcConnection") << "Error waiting for Liveness timer: " << ec.message(); | ||
| } | ||
|
|
||
| if (m_ShuttingDown) { | ||
| break; | ||
| } | ||
|
|
||
| if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) { | ||
| Log(LogInformation, "JsonRpcConnection") | ||
| << "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds."; | ||
| if (m_Seen + m_LivenessTimeout < std::chrono::steady_clock::now() && | ||
| (!m_Endpoint || !m_Endpoint->GetSyncing())) { | ||
| { | ||
| auto msg = Log(LogInformation, "JsonRpcConnection"); | ||
| msg << "No messages for identity '" << m_Identity << "' have been received in the last "; | ||
| if (m_LivenessTimeout > 1s) { | ||
| msg << m_LivenessTimeout.count() / 1000 << " seconds."; | ||
| } else { | ||
| msg << m_LivenessTimeout.count() << " milliseconds"; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here! |
||
| } | ||
| } | ||
|
|
||
| Disconnect(); | ||
| break; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,8 @@ | |
| namespace icinga | ||
| { | ||
|
|
||
| using namespace std::chrono_literals; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The last time I've used this language construct inside a 3-line anonymous function(!), @julianbrost insisted on
|
||
|
|
||
| enum ClientRole | ||
| { | ||
| ClientInbound, | ||
|
|
@@ -61,6 +63,9 @@ class JsonRpcConnection final : public Object | |
| void SendMessage(const Dictionary::Ptr& request); | ||
| void SendRawMessage(const String& request); | ||
|
|
||
| void SetLivenessTimeout(std::chrono::milliseconds timeout); | ||
| void SetHeartbeatInterval(std::chrono::milliseconds interval); | ||
|
|
||
| static Value HeartbeatAPIHandler(const intrusive_ptr<MessageOrigin>& origin, const Dictionary::Ptr& params); | ||
|
|
||
| static double GetWorkQueueRate(); | ||
|
|
@@ -74,14 +79,16 @@ class JsonRpcConnection final : public Object | |
| Shared<AsioTlsStream>::Ptr m_Stream; | ||
| ConnectionRole m_Role; | ||
| double m_Timestamp; | ||
| double m_Seen; | ||
| std::chrono::steady_clock::time_point m_Seen; | ||
| boost::asio::io_context::strand m_IoStrand; | ||
| std::vector<String> m_OutgoingMessagesQueue; | ||
| AsioEvent m_OutgoingMessagesQueued; | ||
| AsioEvent m_WriterDone; | ||
| Atomic<bool> m_ShuttingDown; | ||
| WaitGroup::Ptr m_WaitGroup; | ||
| boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer; | ||
| std::chrono::milliseconds m_LivenessTimeout{60s}; | ||
| std::chrono::milliseconds m_HeartbeatInterval{20s}; | ||
| boost::asio::steady_timer m_CheckLivenessTimer, m_HeartbeatTimer; | ||
|
|
||
| JsonRpcConnection(const WaitGroup::Ptr& waitgroup, const String& identity, bool authenticated, | ||
| const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io); | ||
|
|
@@ -95,8 +102,6 @@ class JsonRpcConnection final : public Object | |
|
|
||
| void MessageHandler(const Dictionary::Ptr& message); | ||
|
|
||
| void CertificateRequestResponseHandler(const Dictionary::Ptr& message); | ||
|
|
||
| void SendMessageInternal(const Dictionary::Ptr& request); | ||
| }; | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be consistent with its sibling ;).
Also, if you really need the extra scoping for the log, I would also move the
remotevariable into that new scope.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without it, the message would get printed after the message in Disconnect().