Skip to content
35 changes: 14 additions & 21 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,9 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
util::UniqueFunction<ProgressHandler> m_progress_handler;
util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;

std::function<SyncClientHookAction(SyncClientHookData data)> m_debug_hook;
bool m_in_debug_hook = false;
// This gets passed to the SessionImpl constructor and owned by the SessionImpl after
// actualization so that it can outlive the SessionWrapper.
std::function<SyncClientHookAction(SyncClientHookData data)> m_debug_hook_for_sess_impl;

SessionReason m_session_reason;

Expand Down Expand Up @@ -984,19 +985,16 @@ void SessionImpl::on_flx_sync_version_complete(int64_t version)

SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data)
{
// Should never be called if session is not active
REALM_ASSERT_EX(m_state == State::Active, m_state);

// Make sure we don't call the debug hook recursively.
if (m_wrapper.m_in_debug_hook) {
if (m_in_debug_hook) {
return SyncClientHookAction::NoAction;
}
m_wrapper.m_in_debug_hook = true;
m_in_debug_hook = true;
auto in_hook_guard = util::make_scope_exit([&]() noexcept {
m_wrapper.m_in_debug_hook = false;
m_in_debug_hook = false;
});

auto action = m_wrapper.m_debug_hook(data);
auto action = m_debug_hook(data);
switch (action) {
case realm::SyncClientHookAction::SuspendWithRetryableError: {
SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
Expand All @@ -1019,13 +1017,9 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con
int64_t query_version, DownloadBatchState batch_state,
size_t num_changesets)
{
if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
return SyncClientHookAction::NoAction;
}
if (REALM_UNLIKELY(m_state != State::Active)) {
if (REALM_LIKELY(!m_debug_hook)) {
return SyncClientHookAction::NoAction;
}

SyncClientHookData data;
data.event = event;
data.batch_state = batch_state;
Expand All @@ -1038,13 +1032,11 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con

SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
{
if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
return SyncClientHookAction::NoAction;
}
if (REALM_UNLIKELY(m_state != State::Active)) {
if (REALM_LIKELY(!m_debug_hook)) {
return SyncClientHookAction::NoAction;
}


SyncClientHookData data;
data.event = event;
data.batch_state = DownloadBatchState::SteadyState;
Expand Down Expand Up @@ -1138,7 +1130,7 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<Sub
, m_signed_access_token{std::move(config.signed_user_token)}
, m_client_reset_config{std::move(config.client_reset_config)}
, m_proxy_config{config.proxy_config} // Throws
, m_debug_hook(std::move(config.on_sync_client_event_hook))
, m_debug_hook_for_sess_impl(std::move(config.on_sync_client_event_hook))
, m_session_reason(config.session_reason)
, m_flx_subscription_store(std::move(flx_sub_store))
, m_migration_store(std::move(migration_store))
Expand Down Expand Up @@ -1519,8 +1511,9 @@ void SessionWrapper::actualize(ServerEndpoint endpoint)
was_created); // Throws
try {
// FIXME: This only makes sense when each session uses a separate connection.
conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token); // Throws
std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(*this, conn); // Throws
conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token); // Throws
std::unique_ptr<SessionImpl> sess = std::make_unique<SessionImpl>(
*this, conn, m_client.get_next_session_ident(), std::move(m_debug_hook_for_sess_impl)); // Throws
if (sync_mode == SyncServerMode::FLX) {
m_flx_pending_bootstrap_store = std::make_unique<PendingBootstrapStore>(m_db, sess->logger);
}
Expand Down
2 changes: 2 additions & 0 deletions src/realm/sync/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ enum class SyncClientHookEvent {
ErrorMessageReceived,
SessionSuspended,
BindMessageSent,
IdentMessageSent,
ClientErrorMessageSent,
BootstrapBatchAboutToProcess,
};

Expand Down
104 changes: 57 additions & 47 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,6 @@ void Connection::initiate_session_deactivation(Session* sess)
if (sess->m_state == Session::Deactivated) {
finish_session_deactivation(sess);
}
if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
if (m_activated && m_state == ConnectionState::disconnected)
m_on_idle->trigger();
}
}


Expand Down Expand Up @@ -403,6 +399,11 @@ void ClientImpl::Connection::finish_session_deactivation(Session* sess)
auto ident = sess->m_ident;
m_sessions.erase(ident);
m_session_history.erase(ident);

if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
if (m_activated && m_state == ConnectionState::disconnected)
m_on_idle->trigger();
}
}

void Connection::force_close()
Expand All @@ -424,20 +425,13 @@ void Connection::force_close()
m_disconnect_delay_in_progress = false;
}

// We must copy any session pointers we want to close to a vector because force_closing
// the session may remove it from m_sessions and invalidate the iterator uses to loop
// through the map. By copying to a separate vector we ensure our iterators remain valid.
std::vector<Session*> to_close;
for (auto& session_pair : m_sessions) {
if (session_pair.second->m_state == Session::State::Active) {
to_close.push_back(session_pair.second.get());
for (auto it = m_sessions.begin(); it != m_sessions.end();) {
auto cur_sess_it = it++;
if (cur_sess_it->second->m_state == Session::Active) {
cur_sess_it->second->force_close();
}
}

for (auto& sess : to_close) {
sess->force_close();
}

logger.debug("Force closed idle connection");
}

Expand Down Expand Up @@ -832,9 +826,9 @@ void Connection::handle_connection_established()
fast_reconnect = true;
}

for (auto& p : m_sessions) {
Session& sess = *p.second;
sess.connection_established(fast_reconnect); // Throws
for (auto it = m_sessions.begin(); it != m_sessions.end();) {
auto cur_sess_it = it++;
cur_sess_it->second->connection_established(fast_reconnect);
}

report_connection_state_change(ConnectionState::connected); // Throws
Expand Down Expand Up @@ -1174,8 +1168,11 @@ void Connection::disconnect(const SessionErrorInfo& info)
auto j = i++;
Session& sess = *j->second;
sess.connection_lost(); // Throws
if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated) {
m_sessions.erase(j);
REALM_ASSERT(m_num_active_sessions);
--m_num_active_sessions;
}
}
}

Expand Down Expand Up @@ -1592,7 +1589,7 @@ void Session::on_integration_failure(const IntegrationException& error)
// Since the deactivation process has not been initiated, the UNBIND
// message cannot have been sent unless an ERROR message was received.
REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
if (m_bind_message_sent && !m_error_message_received && !m_suspended) {
ensure_enlisted_to_send(); // Throws
}
}
Expand Down Expand Up @@ -1671,7 +1668,7 @@ void Session::activate()
m_download_progress = m_progress.download;
REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);

logger.debug("last_version_available = %1", m_last_version_available); // Throws
logger.debug("last_version_available = %1", m_last_version_available); // Throws
logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
logger.debug("progress_download_client_version = %1",
m_progress.download.last_integrated_client_version); // Throws
Expand Down Expand Up @@ -1710,9 +1707,6 @@ void Session::initiate_deactivation()

m_state = Deactivating;

if (!m_suspended)
m_conn.one_less_active_unsuspended_session(); // Throws

if (m_enlisted_to_send) {
REALM_ASSERT(!unbind_process_complete());
return;
Expand All @@ -1721,14 +1715,17 @@ void Session::initiate_deactivation()
// Deactivate immediately if the BIND message has not yet been sent and the
// session is not enlisted to send, or if the unbinding process has already
// completed.
if (!m_bind_message_sent || unbind_process_complete()) {
if ((!m_bind_message_sent || unbind_process_complete()) && !pending_client_error()) {
complete_deactivation(); // Throws
// Life cycle state is now Deactivated
return;
}

// Ready to send the UNBIND message, if it has not already been sent
if (!m_unbind_message_sent) {
// Ready to send the UNBIND message, if it has not already been sent, unless we've
// never sent the BIND message but still have an error message to send. In that case
// when the connection becomes connected we'll send the error message and immediately
// complete de-activation.
if (!m_unbind_message_sent && m_bind_message_sent) {
enlist_to_send(); // Throws
return;
}
Expand All @@ -1739,8 +1736,9 @@ void Session::complete_deactivation()
{
REALM_ASSERT_EX(m_state == Deactivating, m_state);
m_state = Deactivated;

logger.debug("Deactivation completed"); // Throws
if (!m_suspended)
m_conn.one_less_active_unsuspended_session(); // Throws
logger.debug("Deactivation completed"); // Throws
}


Expand All @@ -1754,11 +1752,17 @@ void Session::send_message()
REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
REALM_ASSERT(m_enlisted_to_send);
m_enlisted_to_send = false;

if (m_error_to_send) {
send_json_error_message(); // Throws
return;
}

if (m_state == Deactivating || m_error_message_received || m_suspended) {
// Deactivation has been initiated. If the UNBIND message has not been
// sent yet, there is no point in sending it. Instead, we can let the
// deactivation process complete.
if (!m_bind_message_sent) {
if (!m_bind_message_sent && !pending_client_error()) {
return complete_deactivation(); // Throws
// Life cycle state is now Deactivated
}
Expand All @@ -1777,6 +1781,12 @@ void Session::send_message()
if (!m_bind_message_sent)
return send_bind_message(); // Throws


// Stop sending upload, mark and query messages when the client detects an error.
if (m_error_message_sent) {
return;
}

if (!m_ident_message_sent) {
if (have_client_file_ident())
send_ident_message(); // Throws
Expand All @@ -1791,13 +1801,6 @@ void Session::send_message()
return send_test_command_message();
}

if (m_error_to_send)
return send_json_error_message(); // Throws

// Stop sending upload, mark and query messages when the client detects an error.
if (m_client_error) {
return;
}

if (m_target_download_mark > m_last_download_mark_sent)
return send_mark_message(); // Throws
Expand Down Expand Up @@ -1943,7 +1946,8 @@ void Session::send_ident_message()
m_conn.initiate_write_message(out, this); // Throws

m_ident_message_sent = true;

call_debug_hook(SyncClientHookEvent::IdentMessageSent, m_progress, m_last_sent_flx_query_version,
DownloadBatchState::SteadyState, 0);
// Other messages may be waiting to be sent
enlist_to_send(); // Throws
}
Expand Down Expand Up @@ -2153,29 +2157,35 @@ void Session::send_unbind_message()

void Session::send_json_error_message()
{
REALM_ASSERT_EX(m_state == Active, m_state);
REALM_ASSERT(m_ident_message_sent);
REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
REALM_ASSERT(!m_unbind_message_sent);
REALM_ASSERT(m_error_to_send);
REALM_ASSERT(m_client_error);

auto client_error = std::move(m_client_error);
ClientProtocol& protocol = m_conn.get_client_protocol();
OutputBuffer& out = m_conn.get_output_buffer();
session_ident_type session_ident = get_ident();
auto protocol_error = m_client_error->error_for_server;
auto protocol_error = static_cast<int>(client_error->error_for_server);

auto message = util::format("%1", m_client_error->to_status());
logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
auto message = util::format("%1", client_error->to_status());
logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, protocol_error,
session_ident); // Throws

nlohmann::json error_body_json;
error_body_json["message"] = std::move(message);
protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
protocol.make_json_error_message(out, session_ident, protocol_error,
error_body_json.dump()); // Throws
m_conn.initiate_write_message(out, this); // Throws

m_error_to_send = false;
enlist_to_send(); // Throws
m_error_message_sent = true;

call_debug_hook(SyncClientHookEvent::ClientErrorMessageSent,
ProtocolErrorInfo(protocol_error, message, IsFatal{false}));

if (m_state == Active && m_bind_message_sent) {
enlist_to_send(); // Throws
}
}


Expand Down Expand Up @@ -2346,7 +2356,7 @@ Status Session::receive_download_message(const SyncProgress& progress, std::uint

// Ignore download messages when the client detects an error. This is to prevent transforming the same bad
// changeset over and over again.
if (m_client_error) {
if (m_error_message_sent) {
logger.debug("Ignoring download message because the client detected an integration error");
return Status::OK();
}
Expand Down
Loading