diff --git a/src/cryptonote_core/cryptonote_core.cpp b/src/cryptonote_core/cryptonote_core.cpp index 4c6536318f4..ab20b678abc 100644 --- a/src/cryptonote_core/cryptonote_core.cpp +++ b/src/cryptonote_core/cryptonote_core.cpp @@ -193,6 +193,12 @@ namespace cryptonote , "Run a program for each new block, '%s' will be replaced by the block hash" , "" }; + static const command_line::arg_descriptor arg_sync_notify = { + "sync-notify" + , "Run a program when we start or stop syncing, '%h' will be replaced by the " + "current height, '%t' by the target height, '%s' by 1 if synced, 0 if not." + , "" + }; static const command_line::arg_descriptor arg_prune_blockchain = { "prune-blockchain" , "Prune blockchain" @@ -345,6 +351,7 @@ namespace cryptonote command_line::add_arg(desc, arg_sync_pruned_blocks); command_line::add_arg(desc, arg_max_txpool_weight); command_line::add_arg(desc, arg_block_notify); + command_line::add_arg(desc, arg_sync_notify); command_line::add_arg(desc, arg_prune_blockchain); command_line::add_arg(desc, arg_reorg_notify); command_line::add_arg(desc, arg_block_rate_notify); @@ -647,6 +654,28 @@ namespace cryptonote MERROR("Failed to parse block notify spec: " << e.what()); } + try + { + if (!command_line::is_arg_defaulted(vm, arg_sync_notify)) + { + struct sync_notify + { + tools::Notify cmdline; + + void operator()(bool syncing, std::uint64_t height, std::uint64_t target) const + { + cmdline.notify("%s", syncing ? "1" : " 0", "%h", std::to_string(height).c_str(), "%t", std::to_string(target).c_str(), NULL); + } + }; + + add_sync_notify(sync_notify{{command_line::get_arg(vm, arg_sync_notify).c_str()}}); + } + } + catch (const std::exception &e) + { + MERROR("Failed to parse sync notify spec: " << e.what()); + } + try { if (!command_line::is_arg_defaulted(vm, arg_reorg_notify)) @@ -1451,6 +1480,32 @@ namespace cryptonote m_miner.resume(); } //----------------------------------------------------------------------------------------------- + void core::add_sync_notify(boost::function&& notify) + { + if (notify) + { + m_sync_notifiers.push_back(std::move(notify)); + } + } + //----------------------------------------------------------------------------------------------- + void core::on_start_syncing(uint64_t target) + { + MINFO("Starting syncing"); + const uint64_t current_blockchain_height = get_current_blockchain_height(); + if (target >= current_blockchain_height + 5) // don't switch to unsafe mode just for a few blocks + safesyncmode(false); + for (const auto& notifier : m_sync_notifiers) + notifier(true, current_blockchain_height, target); + } + //----------------------------------------------------------------------------------------------- + void core::on_stop_syncing() + { + MINFO("Stopping syncing"); + safesyncmode(true); + for (const auto& notifier : m_sync_notifiers) + notifier(false, get_current_blockchain_height(), 0); + } + //----------------------------------------------------------------------------------------------- block_complete_entry get_block_complete_entry(block& b, tx_memory_pool &pool) { block_complete_entry bce; @@ -1537,7 +1592,12 @@ namespace cryptonote //----------------------------------------------------------------------------------------------- bool core::add_new_block(const block& b, block_verification_context& bvc) { - return m_blockchain_storage.add_new_block(b, bvc); + const bool syncing = get_current_blockchain_height() < get_target_blockchain_height(); + if (!m_blockchain_storage.add_new_block(b, bvc)) + return false; + if (syncing && get_current_blockchain_height() >= get_target_blockchain_height()) + on_stop_syncing(); + return true; } //----------------------------------------------------------------------------------------------- @@ -2025,6 +2085,11 @@ namespace cryptonote //----------------------------------------------------------------------------------------------- void core::set_target_blockchain_height(uint64_t target_blockchain_height) { + const uint64_t height = get_current_blockchain_height(); + if (m_target_blockchain_height > height && target_blockchain_height <= height) + on_stop_syncing(); + else if (target_blockchain_height > height) + on_start_syncing(target_blockchain_height); m_target_blockchain_height = target_blockchain_height; } //----------------------------------------------------------------------------------------------- diff --git a/src/cryptonote_core/cryptonote_core.h b/src/cryptonote_core/cryptonote_core.h index d2bffdaee0e..069688cd67e 100644 --- a/src/cryptonote_core/cryptonote_core.h +++ b/src/cryptonote_core/cryptonote_core.h @@ -893,6 +893,16 @@ namespace cryptonote */ bool get_txpool_complement(const std::vector &hashes, std::vector &txes); + /** + * @brief sets a sync notify object to call when we start/stop syncing + * + * @param notify the notify object to call + */ + void add_sync_notify(boost::function &¬ify); + + void on_start_syncing(uint64_t target); + void on_stop_syncing(); + private: /** @@ -1131,6 +1141,8 @@ namespace cryptonote std::shared_ptr m_block_rate_notify; boost::function)> m_zmq_pub; + + std::vector> m_sync_notifiers; }; } diff --git a/src/cryptonote_protocol/cryptonote_protocol_handler.h b/src/cryptonote_protocol/cryptonote_protocol_handler.h index 80dd2bc391b..4f5315ad35e 100644 --- a/src/cryptonote_protocol/cryptonote_protocol_handler.h +++ b/src/cryptonote_protocol/cryptonote_protocol_handler.h @@ -111,6 +111,7 @@ namespace cryptonote void log_connections(); std::list get_connections(); const block_queue &get_block_queue() const { return m_block_queue; } + bool has_more_blocks_queued() const { return m_block_queue.get_data_size() > 0; } void stop(); void on_connection_close(cryptonote_connection_context &context); void set_max_out_peers(unsigned int max) { m_max_out_peers = max; } diff --git a/src/cryptonote_protocol/cryptonote_protocol_handler.inl b/src/cryptonote_protocol/cryptonote_protocol_handler.inl index 106253082ff..91d9958964f 100644 --- a/src/cryptonote_protocol/cryptonote_protocol_handler.inl +++ b/src/cryptonote_protocol/cryptonote_protocol_handler.inl @@ -407,10 +407,6 @@ namespace cryptonote << " [Your node is " << abs_diff << " blocks (" << tools::get_human_readable_timespan((abs_diff - diff_v2) * DIFFICULTY_TARGET_V1 + diff_v2 * DIFFICULTY_TARGET_V2) << ") " << (0 <= diff ? std::string("behind") : std::string("ahead")) << "] " << ENDL << "SYNCHRONIZATION started"); - if (hshd.current_height >= m_core.get_current_blockchain_height() + 5) // don't switch to unsafe mode just for a few blocks - { - m_core.safesyncmode(false); - } if (m_core.get_target_blockchain_height() == 0) // only when sync starts { m_sync_timer.resume(); @@ -2479,7 +2475,6 @@ skip: } m_core.on_synchronized(); } - m_core.safesyncmode(true); m_p2p->clear_used_stripe_peers(); // ask for txpool complement from any suitable node if we did not yet diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 3f1885423c9..93d40eaae9d 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -121,6 +121,7 @@ struct t_internals { { core.get().get_blockchain_storage().add_block_notify(cryptonote::listener::zmq_pub::chain_main{shared}); core.get().get_blockchain_storage().add_miner_notify(cryptonote::listener::zmq_pub::miner_data{shared}); + core.get().add_sync_notify(cryptonote::listener::zmq_pub::sync{shared}); core.get().set_txpool_listener(cryptonote::listener::zmq_pub::txpool_add{shared}); } } diff --git a/src/rpc/zmq_pub.cpp b/src/rpc/zmq_pub.cpp index 074b552078f..8882bfec7d6 100644 --- a/src/rpc/zmq_pub.cpp +++ b/src/rpc/zmq_pub.cpp @@ -57,9 +57,11 @@ namespace { constexpr const char txpool_signal[] = "tx_signal"; + constexpr const char sync_signal[] = "sync_signal"; using chain_writer = void(epee::byte_stream&, std::uint64_t, epee::span); using miner_writer = void(epee::byte_stream&, uint8_t, uint64_t, const crypto::hash&, const crypto::hash&, cryptonote::difficulty_type, uint64_t, uint64_t, const std::vector&); + using sync_writer = void(epee::byte_stream&, bool, std::uint64_t, std::uint64_t); using txpool_writer = void(epee::byte_stream&, epee::span); template @@ -132,6 +134,14 @@ namespace const std::vector& tx_backlog; }; + //! Object for sync notification serialization + struct minimal_sync + { + const bool syncing; + const std::uint64_t height; + const std::uint64_t target; + }; + //! Object for "minimal" tx serialization struct minimal_txpool { @@ -187,6 +197,17 @@ namespace dest.EndObject(); } + void toJsonValue(rapidjson::Writer& dest, const minimal_sync self) + { + namespace adapt = boost::adaptors; + + dest.StartObject(); + INSERT_INTO_JSON_OBJECT(dest, syncing, self.syncing); + INSERT_INTO_JSON_OBJECT(dest, height, self.height); + INSERT_INTO_JSON_OBJECT(dest, target, self.target); + dest.EndObject(); + } + void json_full_chain(epee::byte_stream& buf, const std::uint64_t height, const epee::span blocks) { json_pub(buf, blocks); @@ -202,6 +223,11 @@ namespace json_pub(buf, miner_data{major_version, height, prev_id, seed_hash, diff, median_weight, already_generated_coins, tx_backlog}); } + void json_minimal_sync(epee::byte_stream& buf, bool syncing, const std::uint64_t height, const std::uint64_t target) + { + json_pub(buf, minimal_sync{syncing, height, target}); + } + // boost::adaptors are in place "views" - no copy/move takes place // moving transactions (via sort, etc.), is expensive! @@ -236,6 +262,12 @@ namespace {u8"json-full-miner_data", json_miner_data}, }}; + constexpr const std::array, 2> sync_contexts = + {{ + {u8"json-full-sync", json_minimal_sync}, + {u8"json-minimal-sync", json_minimal_sync}, + }}; + constexpr const std::array, 2> txpool_contexts = {{ {u8"json-full-txpool_add", json_full_txpool}, @@ -336,7 +368,7 @@ namespace zmq_msg_size(std::addressof(msg)) }; - if (payload == txpool_signal) + if (payload == txpool_signal || payload == sync_signal) { zmq_msg_close(std::addressof(msg)); return false; @@ -360,6 +392,7 @@ zmq_pub::zmq_pub(void* context) : relay_(), chain_subs_{{0}}, miner_subs_{{0}}, + sync_subs_({0}), txpool_subs_{{0}}, sync_() { @@ -368,6 +401,7 @@ zmq_pub::zmq_pub(void* context) verify_sorted(chain_contexts, "chain_contexts"); verify_sorted(miner_contexts, "miner_contexts"); + verify_sorted(sync_contexts, "sync_contexts"); verify_sorted(txpool_contexts, "txpool_contexts"); relay_.reset(zmq_socket(context, ZMQ_PAIR)); @@ -389,12 +423,13 @@ bool zmq_pub::sub_request(boost::string_ref message) const auto chain_range = get_range(chain_contexts, message); const auto miner_range = get_range(miner_contexts, message); + const auto sync_range = get_range(sync_contexts, message); const auto txpool_range = get_range(txpool_contexts, message); - if (!chain_range.empty() || !miner_range.empty() || !txpool_range.empty()) + if (!chain_range.empty() || !miner_range.empty() || !sync_range.empty() || !txpool_range.empty()) { MDEBUG("Client " << (tag ? "subscribed" : "unsubscribed") << " to " << - chain_range.size() << " chain topic(s), " << miner_range.size() << " miner topic(s) and " << txpool_range.size() << " txpool topic(s)"); + chain_range.size() << " chain topic(s), " << miner_range.size() << " miner topic(s), " << sync_range.size() << " sync topic(s) and " << txpool_range.size() << " txpool topic(s)"); const boost::lock_guard lock{sync_}; switch (tag) @@ -402,11 +437,13 @@ bool zmq_pub::sub_request(boost::string_ref message) case 0: remove_subscriptions(chain_subs_, chain_range, chain_contexts.begin()); remove_subscriptions(miner_subs_, miner_range, miner_contexts.begin()); + remove_subscriptions(sync_subs_, sync_range, sync_contexts.begin()); remove_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin()); return true; case 1: add_subscriptions(chain_subs_, chain_range, chain_contexts.begin()); add_subscriptions(miner_subs_, miner_range, miner_contexts.begin()); + add_subscriptions(sync_subs_, sync_range, sync_contexts.begin()); add_subscriptions(txpool_subs_, txpool_range, txpool_contexts.begin()); return true; default: @@ -498,6 +535,20 @@ std::size_t zmq_pub::send_miner_data(uint8_t major_version, uint64_t height, con return 0; } +std::size_t zmq_pub::send_sync(bool syncing, std::uint64_t height, std::uint64_t target) +{ + const boost::lock_guard lock{sync_}; + for (const std::size_t sub : sync_subs_) + { + if (sub) + { + auto messages = make_pubs(sync_subs_, sync_contexts, syncing, height, target); + return send_messages(relay_.get(), messages); + } + } + return 0; +} + std::size_t zmq_pub::send_txpool_add(std::vector txes) { if (txes.empty()) @@ -537,6 +588,15 @@ void zmq_pub::miner_data::operator()(uint8_t major_version, uint64_t height, con MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed"); } +void zmq_pub::sync::operator()(bool syncing, std::uint64_t height, std::uint64_t target) const +{ + const std::shared_ptr self = self_.lock(); + if (self) + self->send_sync(syncing, height, target); + else + MERROR("Unable to send ZMQ/Pub - ZMQ server destroyed"); +} + void zmq_pub::txpool_add::operator()(std::vector txes) const { const std::shared_ptr self = self_.lock(); diff --git a/src/rpc/zmq_pub.h b/src/rpc/zmq_pub.h index c636e1d7b37..d5e25c2bb87 100644 --- a/src/rpc/zmq_pub.h +++ b/src/rpc/zmq_pub.h @@ -61,6 +61,7 @@ class zmq_pub std::deque> txes_; std::array chain_subs_; std::array miner_subs_; + std::array sync_subs_; std::array txpool_subs_; boost::mutex sync_; //!< Synchronizes counts in `*_subs_` arrays. @@ -95,6 +96,11 @@ class zmq_pub \return Number of ZMQ messages sent to relay. */ std::size_t send_miner_data(uint8_t major_version, uint64_t height, const crypto::hash& prev_id, const crypto::hash& seed_hash, difficulty_type diff, uint64_t median_weight, uint64_t already_generated_coins, const std::vector& tx_backlog); + /*! Send a `ZMQ_PUB` notification when starting/stopping syncing. + Thread-safe. + \return Number of ZMQ messages sent to relay. */ + std::size_t send_sync(bool syncing, std::uint64_t height, std::uint64_t target); + /*! Send a `ZMQ_PUB` notification for new tx(es) being added to the local pool. Thread-safe. \return Number of ZMQ messages sent to relay. */ @@ -114,6 +120,13 @@ class zmq_pub void operator()(uint8_t major_version, uint64_t height, const crypto::hash& prev_id, const crypto::hash& seed_hash, difficulty_type diff, uint64_t median_weight, uint64_t already_generated_coins, const std::vector& tx_backlog) const; }; + //! Callable for `send_sync` with weak ownership to `zmq_pub` object. + struct sync + { + std::weak_ptr self_; + void operator()(bool, std::uint64_t height, std::uint64_t target) const; + }; + //! Callable for `send_txpool_add` with weak ownership to `zmq_pub` object. struct txpool_add {