diff --git a/core/dispute_coordinator/impl/dispute_coordinator_impl.cpp b/core/dispute_coordinator/impl/dispute_coordinator_impl.cpp index ccd9560033..43368fa3eb 100644 --- a/core/dispute_coordinator/impl/dispute_coordinator_impl.cpp +++ b/core/dispute_coordinator/impl/dispute_coordinator_impl.cpp @@ -1847,7 +1847,7 @@ namespace kagome::dispute { return cb(res.as_failure()); } - auto &valid_import = res.value(); + [[maybe_unused]] auto &valid_import = res.value(); return cb(outcome::success()); } diff --git a/core/parachain/CMakeLists.txt b/core/parachain/CMakeLists.txt index 3f59c0b936..a88ffd1f7c 100644 --- a/core/parachain/CMakeLists.txt +++ b/core/parachain/CMakeLists.txt @@ -25,4 +25,5 @@ target_link_libraries(validator_parachain crypto_store network erasure_coding_crust::ec-cpp + waitable_timer ) diff --git a/core/parachain/approval/approval_distribution.cpp b/core/parachain/approval/approval_distribution.cpp index 0f5b1c4c4f..36c55e7389 100644 --- a/core/parachain/approval/approval_distribution.cpp +++ b/core/parachain/approval/approval_distribution.cpp @@ -4,6 +4,7 @@ */ #include +#include #include "clock/impl/basic_waitable_timer.hpp" #include "common/visitor.hpp" @@ -17,10 +18,14 @@ #include "parachain/approval/approval_distribution.hpp" #include "parachain/approval/state.hpp" #include "primitives/authority.hpp" +#include "primitives/math.hpp" #include "runtime/runtime_api/parachain_host_types.hpp" #include "utils/async_sequence.hpp" #include "utils/weak_from_shared.hpp" +static constexpr size_t kMaxAssignmentBatchSize = 200ull; +static constexpr size_t kMaxApprovalBatchSize = 300ull; + OUTCOME_CPP_DEFINE_CATEGORY(kagome::parachain, ApprovalDistribution::Error, e) { using E = kagome::parachain::ApprovalDistribution::Error; switch (e) { @@ -52,9 +57,10 @@ OUTCOME_CPP_DEFINE_CATEGORY(kagome::parachain, ApprovalDistribution::Error, e) { return "Unknown approval-distribution error"; } -static constexpr uint64_t kTickDurationMs = 500; -static constexpr kagome::network::Tick kApprovalDelay = 2; -static constexpr kagome::network::Tick kTickTooFarInFuture = 20; // 10 seconds. +static constexpr uint64_t kTickDurationMs = 500ull; +static constexpr kagome::network::Tick kApprovalDelay = 2ull; +static constexpr kagome::network::Tick kTickTooFarInFuture = + 20ull; // 10 seconds. namespace { @@ -262,9 +268,8 @@ namespace { auto const is_no_show = !has_approved && no_show_at <= drifted_tick_now; if (!is_no_show && !has_approved) { - next_no_show = - std::min(no_show_at + clock_drift, - next_no_show ? *next_no_show : uint64_t{0ull}); + next_no_show = kagome::parachain::approval::min_or_some( + next_no_show, std::make_optional(no_show_at + clock_drift)); } if (is_no_show) { ++no_shows; @@ -527,6 +532,21 @@ namespace kagome::parachain { } }); + remote_view_sub_ = std::make_shared( + peer_view_->getRemoteViewObservable(), false); + remote_view_sub_->subscribe(remote_view_sub_->generateSubscriptionSetId(), + network::PeerView::EventType::kViewUpdated); + remote_view_sub_->setCallback( + [wptr{weak_from_this()}](auto /*set_id*/, + auto && /*internal_obj*/, + auto /*event_type*/, + const libp2p::peer::PeerId &peer_id, + const network::View &view) { + if (auto self = wptr.lock()) { + self->store_remote_view(peer_id, view); + } + }); + chain_sub_ = std::make_shared( peer_view_->intoChainEventsEngine()); chain_sub_->subscribe( @@ -546,9 +566,38 @@ namespace kagome::parachain { internal_context_->start(); thread_pool_context_->start(); this_context_.start(); + + /// TODO(iceseer): clear `known_by` when peer disconnected + return true; } + void ApprovalDistribution::store_remote_view( + const libp2p::peer::PeerId &_peer_id, const network::View &_view) { + REINVOKE_2( + *internal_context_, store_remote_view, _peer_id, _view, peer_id, view); + + primitives::BlockNumber old_finalized_number{0ull}; + if (auto it = peer_views_.find(peer_id); it != peer_views_.end()) { + old_finalized_number = it->second.finalized_number_; + } + + for (primitives::BlockNumber bn = old_finalized_number; + bn <= view.finalized_number_; + ++bn) { + if (auto it = blocks_by_number_.find(bn); it != blocks_by_number_.end()) { + for (const auto &bh : it->second) { + if (auto opt_entry = storedDistribBlockEntries().get(bh)) { + opt_entry->get().known_by.erase(peer_id); + } + } + } + } + + unify_with_peer(storedDistribBlockEntries(), peer_id, view); + peer_views_[peer_id] = std::move(view); + } + void ApprovalDistribution::clearCaches( const primitives::events::ChainEventParams &ev) { REINVOKE_1(*internal_context_, clearCaches, ev, event); @@ -556,22 +605,34 @@ namespace kagome::parachain { if (const auto value = if_type( event)) { - for (const auto &lost : value->get()) { - SL_TRACE(logger_, - "Cleaning up stale pending messages.(block hash={})", - lost); - pending_known_.erase(lost); - active_tranches_.erase(lost); - - if (auto block_entry = storedBlockEntries().get(lost)) { - for (const auto &candidate : block_entry->get().candidates) { - recovery_->remove(candidate.second); - storedCandidateEntries().extract(candidate.second); + approvals_cache_.exclusiveAccess([&](auto &approvals_cache) { + for (const auto &lost : value->get()) { + SL_TRACE(logger_, + "Cleaning up stale pending messages.(block hash={})", + lost); + pending_known_.erase(lost); + active_tranches_.erase(lost); + approving_context_map_.erase(lost); + /// TODO(iceseer): `blocks_by_number_` clear on finalization + + if (auto block_entry = storedBlockEntries().get(lost)) { + for (const auto &candidate : block_entry->get().candidates) { + recovery_->remove(candidate.second); + storedCandidateEntries().extract(candidate.second); + if (auto it_cached = approvals_cache.find(candidate.second); + it_cached != approvals_cache.end()) { + ApprovalCache &approval_cache = it_cached->second; + approval_cache.blocks_.erase(lost); + if (approval_cache.blocks_.empty()) { + approvals_cache.erase(it_cached); + } + } + } + storedBlockEntries().extract(lost); } - storedBlockEntries().extract(lost); + storedDistribBlockEntries().extract(lost); } - storedDistribBlockEntries().extract(lost); - } + }); } } @@ -588,7 +649,6 @@ namespace kagome::parachain { return std::make_pair((ValidatorIndex)ix, std::move(res.value())); } } - SL_TRACE(logger_, "No assignment key"); return std::nullopt; } @@ -600,22 +660,9 @@ namespace kagome::parachain { const CandidateIncludedList &leaving_cores) { if (config.n_cores == 0 || config.assignment_keys.empty() || config.validator_groups.empty()) { - SL_TRACE(logger_, - "Not producing assignments because config is degenerate " - "(n_cores:{}, has_assignment_keys:{}, has_validator_groups:{})", - config.n_cores, - config.assignment_keys.size(), - config.validator_groups.size()); return {}; } - SL_INFO(logger_, - "Compute assignments." - "(n_cores:{}, has_assignment_keys:{}, has_validator_groups:{})", - config.n_cores, - config.assignment_keys.size(), - config.validator_groups.size()); - std::optional> founded_key = findAssignmentKey(keystore, config); if (!founded_key) { @@ -692,14 +739,12 @@ namespace kagome::parachain { std::move(babe_config); acu.second.included_candidates = std::move(included); - acu.second.session_index = session_index; - acu.second.session_info = std::move(session_info); acu.second.babe_epoch = epoch_number; acu.second.babe_block_header = std::move(babe_block_header); acu.second.authorities = std::move(authorities); acu.second.randomness = std::move(randomness); - this->try_process_approving_context(acu); + this->try_process_approving_context(acu, session_index, session_info); }); } @@ -716,7 +761,9 @@ namespace kagome::parachain { } void ApprovalDistribution::try_process_approving_context( - ApprovalDistribution::ApprovingContextUnit &acu) { + ApprovalDistribution::ApprovingContextUnit &acu, + SessionIndex session_index, + const runtime::SessionInfo &session_info) { ApprovingContext &ac = acu.second; if (!ac.is_complete()) { return; @@ -744,19 +791,19 @@ namespace kagome::parachain { logger_->warn("Relay VRF return error.(error={})", res.error().message()); return; } + auto assignments = compute_assignments( - keystore_, *ac.session_info, relay_vrf, *ac.included_candidates); + keystore_, session_info, relay_vrf, *ac.included_candidates); /// TODO(iceseer): force approve impl ac.complete_callback(ImportedBlockInfo{ .included_candidates = std::move(*ac.included_candidates), - .session_index = *ac.session_index, + .session_index = session_index, .assignments = std::move(assignments), - .n_validators = ac.session_info->validators.size(), + .n_validators = session_info.validators.size(), .relay_vrf_story = relay_vrf, .slot = unsafe_vrf.slot, - .session_info = std::move(*ac.session_info), .force_approve = std::nullopt}); } @@ -766,7 +813,30 @@ namespace kagome::parachain { kagome::parachain::approval::ApprovalStatus>> ApprovalDistribution::approval_status(const BlockEntry &block_entry, CandidateEntry &candidate_entry) { - auto &session_info = block_entry.session_info; + std::optional opt_session_info{}; + if (auto session_info_res = parachain_host_->session_info( + block_entry.parent_hash, block_entry.session); + session_info_res.has_value()) { + opt_session_info = std::move(session_info_res.value()); + } else { + logger_->warn( + "Approval status. Session info runtime request failed. " + "(block_hash={}, session_index={}, error={})", + block_entry.parent_hash, + block_entry.session, + session_info_res.error().message()); + return std::nullopt; + } + + if (!opt_session_info) { + logger_->debug( + "Can't obtain SessionInfo. (parent_hash={}, session_index={})", + block_entry.parent_hash, + block_entry.session); + return std::nullopt; + } + + runtime::SessionInfo &session_info = *opt_session_info; const auto block_hash = block_entry.block_hash; const auto tranche_now = @@ -868,7 +938,7 @@ namespace kagome::parachain { const primitives::BlockHash &block_hash, const primitives::BlockHash &parent_hash, scale::BitVec &&approved_bitfield, - ImportedBlockInfo &&block_info) { + const ImportedBlockInfo &block_info) { std::vector> entries; std::vector> candidates; if (auto blocks = storedBlocks().get_or_create(block_number); @@ -882,7 +952,7 @@ namespace kagome::parachain { candidates.reserve(block_info.included_candidates.size()); for (const auto &[candidateHash, candidateReceipt, coreIndex, groupIndex] : block_info.included_candidates) { - std::optional> assignment{}; + std::optional> assignment{}; if (auto assignment_it = block_info.assignments.find(coreIndex); assignment_it != block_info.assignments.end()) { assignment = assignment_it->second; @@ -912,7 +982,6 @@ namespace kagome::parachain { .parent_hash = parent_hash, .block_number = block_number, .session = block_info.session_index, - .session_info = std::move(block_info.session_info), .slot = block_info.slot, .relay_vrf_story = std::move(block_info.relay_vrf_story), .candidates = std::move(candidates), @@ -927,6 +996,7 @@ namespace kagome::parachain { primitives::BlockNumber block_number, const primitives::BlockHash &block_hash, const primitives::BlockHash &parent_hash, + primitives::BlockNumber finalized_block_number, ApprovalDistribution::ImportedBlockInfo &&imported_block) { SL_TRACE(logger_, "Star imported block processing. (block number={}, block hash={}, " @@ -935,33 +1005,49 @@ namespace kagome::parachain { block_hash, parent_hash); + OUTCOME_TRY(session_info, + parachain_host_->session_info(block_hash, + imported_block.session_index)); + + if (!session_info) { + SL_TRACE(logger_, + "No session info. (block number={}, block hash={}, " + "parent hash={}, session index={})", + block_number, + block_hash, + parent_hash, + imported_block.session_index); + return Error::NO_SESSION_INFO; + } + const auto block_tick = slotNumberToTick(config_.slot_duration_millis, imported_block.slot); - const auto no_show_duration = - slotNumberToTick(config_.slot_duration_millis, - imported_block.session_info.no_show_slots); + const auto no_show_duration = slotNumberToTick(config_.slot_duration_millis, + session_info->no_show_slots); - const auto needed_approvals = imported_block.session_info.needed_approvals; + const auto needed_approvals = session_info->needed_approvals; const auto num_candidates = imported_block.included_candidates.size(); scale::BitVec approved_bitfield; size_t num_ones = 0ull; if (0 == needed_approvals) { - SL_TRACE(logger_, "Auto-approving all candidates: {}", block_hash); + SL_TRACE(logger_, "Insta-approving all candidates. {}", block_hash); approved_bitfield.bits.insert( - approved_bitfield.bits.begin(), num_candidates, true); + approved_bitfield.bits.end(), num_candidates, true); num_ones = num_candidates; } else { approved_bitfield.bits.insert( - approved_bitfield.bits.begin(), num_candidates, false); + approved_bitfield.bits.end(), num_candidates, false); for (size_t ix = 0; ix < imported_block.included_candidates.size(); ++ix) { const auto &[_0, _1, _2, backing_group] = imported_block.included_candidates[ix]; - const auto backing_group_size = - imported_block.session_info.validator_groups[backing_group].size(); + const size_t backing_group_size = + ((backing_group < session_info->validator_groups.size()) + ? session_info->validator_groups[backing_group].size() + : 0ull); if (math::sat_sub_unsigned(imported_block.n_validators, backing_group_size) < needed_approvals) { @@ -988,21 +1074,23 @@ namespace kagome::parachain { block_hash, parent_hash, std::move(approved_bitfield), - std::move(imported_block))); + imported_block)); std::vector candidates; for (const auto &[hash, _0, _1, _2] : imported_block.included_candidates) { candidates.emplace_back(hash); } - runNewBlocks(approval::BlockApprovalMeta{ - .hash = block_hash, - .number = block_number, - .parent_hash = parent_hash, - .candidates = std::move(candidates), - .slot = imported_block.slot, - .session = imported_block.session_index, - }); + runNewBlocks( + approval::BlockApprovalMeta{ + .hash = block_hash, + .number = block_number, + .parent_hash = parent_hash, + .candidates = std::move(candidates), + .slot = imported_block.slot, + .session = imported_block.session_index, + }, + finalized_block_number); return BlockImportedCandidates{.block_hash = block_hash, .block_number = block_number, @@ -1011,7 +1099,9 @@ namespace kagome::parachain { .imported_candidates = std::move(entries)}; } - void ApprovalDistribution::runNewBlocks(approval::BlockApprovalMeta &&meta) { + void ApprovalDistribution::runNewBlocks( + approval::BlockApprovalMeta &&meta, + primitives::BlockNumber finalized_block_number) { std::optional new_hash; if (!storedDistribBlockEntries().get(meta.hash)) { const auto candidates_count = meta.candidates.size(); @@ -1022,34 +1112,60 @@ namespace kagome::parachain { storedDistribBlockEntries().set(meta.hash, DistribBlockEntry{ .candidates = std::move(candidates), + .knowledge = {}, + .known_by = {}, + .number = meta.number, + .parent_hash = meta.parent_hash, }); - } - - logger_->trace("Got new block.(hash={})", new_hash); - /// TODO(iceseer): intersection in views + blocks_by_number_[meta.number].insert(meta.hash); + } + + internal_context_->execute([wself{weak_from_this()}, + new_hash, + finalized_block_number, + meta{std::move(meta)}]() { + if (auto self = wself.lock()) { + SL_TRACE(self->logger_, "Got new block.(hash={})", new_hash); + for (const auto &[peed_id, view] : self->peer_views_) { + for (const auto &h : view.heads_) { + if (h == meta.hash) { + self->unify_with_peer( + self->storedDistribBlockEntries(), + peed_id, + network::View{ + .heads_ = {h}, + .finalized_number_ = finalized_block_number, + }); + } + } + } - for (auto it = pending_known_.begin(); it != pending_known_.end();) { - if (!storedDistribBlockEntries().get(it->first)) { - ++it; - } else { - logger_->trace("Processing pending assignment/approvals.(count={})", - it->second.size()); - for (auto i = it->second.begin(); i != it->second.end(); ++i) { - visit_in_place( - i->second, - [&](const network::Assignment &assignment) { - import_and_circulate_assignment( - i->first, - assignment.indirect_assignment_cert, - assignment.candidate_ix); - }, - [&](const network::IndirectSignedApprovalVote &approval) { - import_and_circulate_approval(i->first, approval); - }); + for (auto it = self->pending_known_.begin(); + it != self->pending_known_.end();) { + if (!self->storedDistribBlockEntries().get(it->first)) { + ++it; + } else { + self->logger_->trace( + "Processing pending assignment/approvals.(count={})", + it->second.size()); + for (auto i = it->second.begin(); i != it->second.end(); ++i) { + visit_in_place( + i->second, + [&](const network::Assignment &assignment) { + self->import_and_circulate_assignment( + i->first, + assignment.indirect_assignment_cert, + assignment.candidate_ix); + }, + [&](const network::IndirectSignedApprovalVote &approval) { + self->import_and_circulate_approval(i->first, approval); + }); + } + it = self->pending_known_.erase(it); + } } - it = pending_known_.erase(it); } - } + }); } template @@ -1060,11 +1176,6 @@ namespace kagome::parachain { ->get_executor() .running_in_this_thread()); - /// clear unuseful heads - for (const auto &l_head : updated.lost) { - approving_context_map_.erase(l_head); - } - const auto block_number = updated.new_head.number; auto parent_hash{updated.new_head.parent_hash}; if (approving_context_map_.count(head) != 0ull) { @@ -1077,15 +1188,14 @@ namespace kagome::parachain { ApprovingContext{ .block_header = updated.new_head, .included_candidates = std::nullopt, - .session_index = std::nullopt, .babe_block_header = std::nullopt, .babe_epoch = std::nullopt, - .session_info = std::nullopt, .complete_callback_context = internal_context_->io_context(), .complete_callback = [wself{weak_from_this()}, block_hash{head}, block_number, + finalized_block_number{updated.view.finalized_number_}, parent_hash{std::move(parent_hash)}, func(std::forward(func))]( outcome::result &&block_info) mutable { @@ -1105,6 +1215,7 @@ namespace kagome::parachain { block_number, block_hash, parent_hash, + finalized_block_number, std::move(block_info.value()))); }}); @@ -1228,6 +1339,13 @@ namespace kagome::parachain { candidate_receipt, validation_code); + self->approvals_cache_.exclusiveAccess([&](auto &approvals_cache) { + if (auto it = approvals_cache.find(candidate_hash); + it != approvals_cache.end()) { + ApprovalCache &ac = it->second; + ac.approval_result = outcome; + } + }); if (ApprovalOutcome::Approved == outcome) { self->issue_approval( candidate_hash, validator_index, relay_block_hash); @@ -1280,7 +1398,31 @@ namespace kagome::parachain { GET_OPT_VALUE_OR_EXIT(block_entry, AssignmentCheckResult::Bad, storedBlockEntries().get(assignment.block_hash)); - auto &session_info = block_entry.session_info; + + std::optional opt_session_info{}; + if (auto session_info_res = parachain_host_->session_info( + block_entry.parent_hash, block_entry.session); + session_info_res.has_value()) { + opt_session_info = std::move(session_info_res.value()); + } else { + logger_->warn( + "Assignment. Session info runtime request failed. (parent_hash={}, " + "session_index={}, error={})", + block_entry.parent_hash, + block_entry.session, + session_info_res.error().message()); + return AssignmentCheckResult::Bad; + } + + if (!opt_session_info) { + logger_->debug( + "Can't obtain SessionInfo. (parent_hash={}, session_index={})", + block_entry.parent_hash, + block_entry.session); + return AssignmentCheckResult::Bad; + } + + runtime::SessionInfo &session_info = *opt_session_info; if (candidate_index >= block_entry.candidates.size()) { logger_->warn( "Candidate index more than candidates array.(candidate index={})", @@ -1360,7 +1502,30 @@ namespace kagome::parachain { block_entry, ApprovalCheckResult::Bad, storedBlockEntries().get(approval.payload.payload.block_hash)); - auto &session_info = block_entry.session_info; + std::optional opt_session_info{}; + if (auto session_info_res = parachain_host_->session_info( + approval.payload.payload.block_hash, block_entry.session); + session_info_res.has_value()) { + opt_session_info = std::move(session_info_res.value()); + } else { + logger_->warn( + "Approval. Session info runtime request failed. (block_hash={}, " + "session_index={}, error={})", + approval.payload.payload.block_hash, + block_entry.session, + session_info_res.error().message()); + return ApprovalCheckResult::Bad; + } + + if (!opt_session_info) { + logger_->debug( + "Can't obtain SessionInfo. (parent_hash={}, session_index={})", + approval.payload.payload.block_hash, + block_entry.session); + return ApprovalCheckResult::Bad; + } + + runtime::SessionInfo &session_info = *opt_session_info; if (approval.payload.payload.candidate_index >= block_entry.candidates.size()) { logger_->warn( @@ -1470,35 +1635,106 @@ namespace kagome::parachain { } } + auto message_subject{ + std::make_tuple(block_hash, claimed_candidate_index, validator_index)}; + auto message_kind{approval::MessageKind::Assignment}; + if (source) { - /// TODO(iceseer): vector-clock for knowledge + const auto &peer_id = source->get(); + + if (auto it = entry.known_by.find(peer_id); it != entry.known_by.end()) { + if (auto &peer_knowledge = it->second; + peer_knowledge.contains(message_subject, message_kind)) { + if (!peer_knowledge.received.insert(message_subject, message_kind)) { + SL_TRACE(logger_, + "Duplicate assignment. (peer id={}, block_hash={}, " + "candidate index={}, validator index={})", + peer_id, + std::get<0>(message_subject), + std::get<1>(message_subject), + std::get<2>(message_subject)); + } + return; + } + } else { + SL_WARN(logger_, + "Assignment from a peer is out of view. (peer id={}, " + "block_hash={}, candidate index={}, validator index={})", + peer_id, + std::get<0>(message_subject), + std::get<1>(message_subject), + std::get<2>(message_subject)); + } + + /// if the assignment is known to be valid, reward the peer + if (entry.knowledge.contains(message_subject, message_kind)) { + /// TODO(iceseer): modify reputation + if (auto it = entry.known_by.find(peer_id); + it != entry.known_by.end()) { + SL_TRACE(logger_, "Known assignment. (peer id={})", peer_id); + it->second.received.insert(message_subject, message_kind); + } + } + switch ( check_and_import_assignment(assignment, claimed_candidate_index)) { - case AssignmentCheckResult::Accepted: - break; - case AssignmentCheckResult::Bad: + case AssignmentCheckResult::Accepted: { + SL_TRACE(logger_, + "Assignment accepted. (peer id={}, block hash={})", + source->get(), + block_hash); + entry.knowledge.known_messages[message_subject] = message_kind; + if (auto it = entry.known_by.find(peer_id); + it != entry.known_by.end()) { + it->second.received.insert(message_subject, message_kind); + } + } break; + case AssignmentCheckResult::Bad: { SL_WARN(logger_, "Got bad assignment from peer. (peer id={}, block hash={})", source->get(), block_hash); + } return; - case AssignmentCheckResult::TooFarInFuture: + case AssignmentCheckResult::TooFarInFuture: { SL_TRACE( logger_, "Got an assignment too far in the future. (peer id={}, block " "hash={})", source->get(), block_hash); + } return; - case AssignmentCheckResult::AcceptedDuplicate: + case AssignmentCheckResult::AcceptedDuplicate: { + if (auto it = entry.known_by.find(peer_id); + it != entry.known_by.end()) { + auto &peer_knowledge = it->second; + peer_knowledge.received.insert(message_subject, message_kind); + } SL_TRACE(logger_, - "Got duplicated assignment. (peer id={}, block hash={})", + "Got an `AcceptedDuplicate` assignment. (peer id={}, block " + "hash={})", source->get(), block_hash); + } return; } } else { - /// TODO(iceseer): vector-clock for knowledge + if (!entry.knowledge.insert(message_subject, message_kind)) { + SL_WARN(logger_, + "Importing locally an already known assignment. " + "(block_hash={}, candidate index={}, validator index={})", + std::get<0>(message_subject), + std::get<1>(message_subject), + std::get<2>(message_subject)); + return; + } + SL_TRACE(logger_, + "Importing locally a new assignment. (block_hash={}, candidate " + "index={}, validator index={})", + std::get<0>(message_subject), + std::get<1>(message_subject), + std::get<2>(message_subject)); } const auto local = !source; @@ -1511,7 +1747,31 @@ namespace kagome::parachain { }) .first->second; - runDistributeAssignment(assignment, claimed_candidate_index); + auto peer_filter = [&](const auto &peer, const auto &peer_kn) { + if (source && peer == source->get()) { + return false; + } + + const bool already_sent = + peer_kn.sent.contains(message_subject, + approval::MessageKind::Assignment) + || peer_kn.sent.contains(message_subject, + approval::MessageKind::Approval); + return !already_sent; + }; + + std::unordered_set peers{}; + for (auto &[peer_id, peer_knowledge] : entry.known_by) { + if (peer_filter(peer_id, peer_knowledge)) { + peers.insert(peer_id); + peer_knowledge.sent.insert(message_subject, message_kind); + } + } + + if (!peers.empty()) { + runDistributeAssignment( + assignment, claimed_candidate_index, std::move(peers)); + } } void ApprovalDistribution::import_and_circulate_approval( @@ -1566,20 +1826,100 @@ namespace kagome::parachain { } } + auto message_subject{ + std::make_tuple(block_hash, candidate_index, validator_index)}; + auto message_kind{approval::MessageKind::Approval}; + if (source) { - /// TODO(iceseer): vector-clock for knowledge + const auto &peer_id = source->get(); + if (!entry.knowledge.contains(message_subject, + approval::MessageKind::Assignment)) { + SL_TRACE(logger_, + "Unknown approval assignment. (peer id={}, block hash={}, " + "candidate={}, validator={})", + peer_id, + std::get<0>(message_subject), + std::get<1>(message_subject), + std::get<2>(message_subject)); + return; + } + + // check if our knowledge of the peer already contains this approval + if (auto it = entry.known_by.find(peer_id); it != entry.known_by.end()) { + if (auto &peer_knowledge = it->second; + peer_knowledge.contains(message_subject, message_kind)) { + if (!peer_knowledge.received.insert(message_subject, message_kind)) { + SL_TRACE(logger_, + "Duplicate approval. (peer id={}, block_hash={}, " + "candidate index={}, validator index={})", + peer_id, + std::get<0>(message_subject), + std::get<1>(message_subject), + std::get<2>(message_subject)); + } + return; + } + } else { + SL_TRACE(logger_, + "Approval from a peer is out of view. (peer id={}, " + "block_hash={}, candidate index={}, validator index={})", + peer_id, + std::get<0>(message_subject), + std::get<1>(message_subject), + std::get<2>(message_subject)); + } + + /// if the approval is known to be valid, reward the peer + if (entry.knowledge.contains(message_subject, message_kind)) { + SL_TRACE(logger_, + "Known approval. (peer id={}, block hash={}, " + "candidate={}, validator={})", + peer_id, + std::get<0>(message_subject), + std::get<1>(message_subject), + std::get<2>(message_subject)); + + if (auto it = entry.known_by.find(peer_id); + it != entry.known_by.end()) { + it->second.received.insert(message_subject, message_kind); + } + return; + } + switch (check_and_import_approval(vote)) { - case ApprovalCheckResult::Accepted: - break; - case ApprovalCheckResult::Bad: + case ApprovalCheckResult::Accepted: { + entry.knowledge.insert(message_subject, message_kind); + if (auto it = entry.known_by.find(peer_id); + it != entry.known_by.end()) { + it->second.received.insert(message_subject, message_kind); + } + } break; + case ApprovalCheckResult::Bad: { logger_->warn( - "Got bad approval from peer. (peer id={}, block hash={})", + "Got a bad approval from peer. (peer id={}, block hash={})", source->get(), block_hash); + } return; } } else { - /// TODO(iceseer): vector-clock for knowledge + if (!entry.knowledge.insert(message_subject, message_kind)) { + // if we already imported an approval, there is no need to distribute it + // again + SL_WARN(logger_, + "Importing locally an already known approval. " + "(block_hash={}, candidate index={}, validator index={})", + std::get<0>(message_subject), + std::get<1>(message_subject), + std::get<2>(message_subject)); + return; + } + SL_TRACE(logger_, + "Importing locally a new approval. (block_hash={}, candidate " + "index={}, validator index={})", + std::get<0>(message_subject), + std::get<1>(message_subject), + std::get<2>(message_subject)); } if (auto it = candidate_entry.messages.find(validator_index); @@ -1598,7 +1938,26 @@ namespace kagome::parachain { validator_index); return; } - runDistributeApproval(vote); + + auto peer_filter = [&](const auto &peer, const auto &peer_kn) { + if (source && peer == source->get()) { + return false; + } + return peer_kn.sent.contains(message_subject, + approval::MessageKind::Assignment); + }; + + std::unordered_set peers{}; + for (auto &[peer_id, peer_knowledge] : entry.known_by) { + if (peer_filter(peer_id, peer_knowledge)) { + peers.insert(peer_id); + peer_knowledge.sent.insert(message_subject, message_kind); + } + } + + if (!peers.empty()) { + runDistributeApproval(vote, std::move(peers)); + } } void ApprovalDistribution::getApprovalSignaturesForCandidate( @@ -1750,13 +2109,16 @@ namespace kagome::parachain { void ApprovalDistribution::runDistributeAssignment( const approval::IndirectAssignmentCert &_indirect_cert, - CandidateIndex _candidate_index) { - REINVOKE_2(this_context_, + CandidateIndex _candidate_index, + std::unordered_set &&_peers) { + REINVOKE_3(this_context_, runDistributeAssignment, _indirect_cert, _candidate_index, + _peers, indirect_cert, - candidate_index); + candidate_index, + peers); logger_->info( "Distributing assignment on candidate (block hash={}, candidate " @@ -1767,34 +2129,102 @@ namespace kagome::parachain { auto se = pm_->getStreamEngine(); BOOST_ASSERT(se); - se->broadcast(router_->getValidationProtocol(), - std::make_shared< - network::WireMessage>( - network::ApprovalDistributionMessage{network::Assignments{ - .assignments = {network::Assignment{ - .indirect_assignment_cert = indirect_cert, - .candidate_ix = candidate_index, - }}}})); + se->broadcast( + router_->getValidationProtocol(), + std::make_shared< + network::WireMessage>( + network::ApprovalDistributionMessage{network::Assignments{ + .assignments = {network::Assignment{ + .indirect_assignment_cert = indirect_cert, + .candidate_ix = candidate_index, + }}}}), + [&](const libp2p::peer::PeerId &p) { return peers.count(p) != 0ull; }); + } + + void ApprovalDistribution::send_assignments_batched( + std::deque &&_assignments, + const libp2p::peer::PeerId &_peer_id) { + REINVOKE_2(this_context_, + send_assignments_batched, + _assignments, + _peer_id, + assignments, + peer_id); + + auto se = pm_->getStreamEngine(); + BOOST_ASSERT(se); // kMaxAssignmentBatchSize + + while (!assignments.empty()) { + auto begin = assignments.begin(); + auto end = (assignments.size() > kMaxAssignmentBatchSize) + ? assignments.begin() + kMaxAssignmentBatchSize + : assignments.end(); + + auto msg = std::make_shared< + network::WireMessage>( + network::ApprovalDistributionMessage{network::Assignments{ + .assignments = std::vector(begin, end), + }}); + + se->send(peer_id, router_->getValidationProtocol(), msg); + assignments.erase(begin, end); + } + } + + void ApprovalDistribution::send_approvals_batched( + std::deque &&_approvals, + const libp2p::peer::PeerId &_peer_id) { + REINVOKE_2(this_context_, + send_approvals_batched, + _approvals, + _peer_id, + approvals, + peer_id); + + auto se = pm_->getStreamEngine(); + BOOST_ASSERT(se); // kMaxApprovalBatchSize + + while (!approvals.empty()) { + auto begin = approvals.begin(); + auto end = (approvals.size() > kMaxApprovalBatchSize) + ? approvals.begin() + kMaxApprovalBatchSize + : approvals.end(); + + auto msg = std::make_shared< + network::WireMessage>( + network::ApprovalDistributionMessage{network::Approvals{ + .approvals = + std::vector(begin, end), + }}); + + se->send(peer_id, router_->getValidationProtocol(), msg); + approvals.erase(begin, end); + } } void ApprovalDistribution::runDistributeApproval( - const network::IndirectSignedApprovalVote &_vote) { - REINVOKE_1(this_context_, runDistributeApproval, _vote, vote); + const network::IndirectSignedApprovalVote &_vote, + std::unordered_set &&_peers) { + REINVOKE_2( + this_context_, runDistributeApproval, _vote, _peers, vote, peers); logger_->info( - "Distributing our approval vote on candidate (block={}, index={})", + "Sending an approval to peers. (block={}, index={}, num peers={})", vote.payload.payload.block_hash, - vote.payload.payload.candidate_index); + vote.payload.payload.candidate_index, + peers.size()); auto se = pm_->getStreamEngine(); BOOST_ASSERT(se); - se->broadcast(router_->getValidationProtocol(), - std::make_shared< - network::WireMessage>( - network::ApprovalDistributionMessage{network::Approvals{ - .approvals = {vote}, - }})); + se->broadcast( + router_->getValidationProtocol(), + std::make_shared< + network::WireMessage>( + network::ApprovalDistributionMessage{network::Approvals{ + .approvals = {vote}, + }}), + [&](const libp2p::peer::PeerId &p) { return peers.count(p) != 0ull; }); } void ApprovalDistribution::issue_approval(const CandidateHash &can_hash, @@ -1826,7 +2256,30 @@ namespace kagome::parachain { return; } - auto &session_info = block_entry.session_info; + std::optional opt_session_info{}; + if (auto session_info_res = parachain_host_->session_info( + block_entry.parent_hash, block_entry.session); + session_info_res.has_value()) { + opt_session_info = std::move(session_info_res.value()); + } else { + logger_->warn( + "Issue approval. Session info runtime request failed. " + "(block_hash={}, session_index={}, error={})", + block_entry.parent_hash, + block_entry.session, + session_info_res.error().message()); + return; + } + + if (!opt_session_info) { + logger_->debug( + "Can't obtain SessionInfo. (parent_hash={}, session_index={})", + block_entry.parent_hash, + block_entry.session); + return; + } + + runtime::SessionInfo &session_info = *opt_session_info; if (*candidate_index >= block_entry.candidates.size()) { logger_->warn( "Received malformed request to approve out-of-bounds candidate index " @@ -1926,14 +2379,35 @@ namespace kagome::parachain { import_and_circulate_assignment( std::nullopt, indirect_cert, candidate_index); - /// TODO(iceseer): make cache by 'candidate_hash' - launch_approval(relay_block_hash, - candidate_hash, - session, - candidate, - validator_index, - block_hash, - backing_group); + + std::optional approval_state = + approvals_cache_.exclusiveAccess( + [&](auto &approvals_cache) -> std::optional { + if (auto it = approvals_cache.find(candidate_hash); + it != approvals_cache.end()) { + it->second.blocks_.insert(relay_block_hash); + return it->second.approval_result; + } + approvals_cache.emplace( + candidate_hash, + ApprovalCache{ + .blocks_ = {relay_block_hash}, + .approval_result = ApprovalOutcome::Failed, + }); + return std::nullopt; + }); + + if (!approval_state) { + launch_approval(relay_block_hash, + candidate_hash, + session, + candidate, + validator_index, + block_hash, + backing_group); + } else if (*approval_state == ApprovalOutcome::Approved) { + issue_approval(candidate_hash, validator_index, block_hash); + } } void ApprovalDistribution::schedule_wakeup_action( @@ -2154,13 +2628,6 @@ namespace kagome::parachain { const auto ms_wakeup_after = math::sat_sub_unsigned(ms_wakeup, ms_now); auto &target_block = active_tranches_[block_hash]; - auto target_candidate = target_block.find(candidate_hash); - if (target_candidate != target_block.end()) { - if (target_candidate->second.first <= tick) { - return; - } - } - SL_TRACE(logger_, "Scheduling wakeup. (block_hash={}, candidate_hash={}, " "block_number={}, tick={}, after={})", @@ -2183,26 +2650,17 @@ namespace kagome::parachain { .running_in_this_thread()); if (auto target_block_it = self->active_tranches_.find(block_hash); target_block_it != self->active_tranches_.end()) { - auto &target_block = target_block_it->second; - if (auto target_candidate_it = target_block.find(candidate_hash); - target_candidate_it != target_block.end()) { - t = std::move(target_candidate_it->second.second); - target_block.erase(target_candidate_it); - - if (ec) { - SL_ERROR(self->logger_, - "error happened while waiting on tranche the " - "timer: {}", - ec.message()); - return; - } - self->handleTranche(block_hash, block_number, candidate_hash); + if (ec) { + SL_TRACE(self->logger_, + "Tranche operation waiting failed timer: {}", + ec.message()); + return; } + self->handleTranche(block_hash, block_number, candidate_hash); } } }); - target_block.insert_or_assign(candidate_hash, - std::make_pair(tick, std::move(t))); + target_block[candidate_hash].emplace_back(tick, std::move(t)); } void ApprovalDistribution::handleTranche( @@ -2223,8 +2681,30 @@ namespace kagome::parachain { auto &block_entry = opt_block_entry->get(); auto &candidate_entry = opt_candidate_entry->get(); - auto &session_info = opt_block_entry->get().session_info; + std::optional opt_session_info{}; + if (auto session_info_res = parachain_host_->session_info( + block_entry.parent_hash, block_entry.session); + session_info_res.has_value()) { + opt_session_info = std::move(session_info_res.value()); + } else { + logger_->warn( + "Handle tranche. Session info runtime request failed. " + "(block_hash={}, session_index={}, error={})", + block_entry.parent_hash, + block_entry.session, + session_info_res.error().message()); + return; + } + if (!opt_session_info) { + logger_->debug( + "Can't obtain SessionInfo. (parent_hash={}, session_index={})", + block_entry.parent_hash, + block_entry.session); + return; + } + + runtime::SessionInfo &session_info = *opt_session_info; const auto block_tick = slotNumberToTick(config_.slot_duration_millis, block_entry.slot); const auto no_show_duration = slotNumberToTick(config_.slot_duration_millis, @@ -2261,6 +2741,10 @@ namespace kagome::parachain { BOOST_ASSERT(storedCandidateEntries().get(candidate_hash)->get() == candidate_entry); } + SL_TRACE(logger_, + "Wakeup processed. (should trigger={}, cert={})", + should_trigger, + (bool)maybe_cert); if (maybe_cert) { const auto &[cert, val_index, tranche] = *maybe_cert; @@ -2295,6 +2779,114 @@ namespace kagome::parachain { approval::WakeupProcessed{}); } + void ApprovalDistribution::unify_with_peer( + StoreUnit> &entries, + const libp2p::peer::PeerId &peer_id, + const network::View &view) { + std::deque assignments_to_send; + std::deque approvals_to_send; + + const auto view_finalized_number = view.finalized_number_; + for (const auto &head : view.heads_) { + primitives::BlockHash block = head; + while (true) { + auto opt_entry = entries.get(block); + if (!opt_entry || opt_entry->get().number <= view_finalized_number) { + break; + } + + auto &entry = opt_entry->get(); + if (entry.known_by.count(peer_id) != 0ull) { + break; + } + + auto &peer_knowledge = entry.known_by[peer_id]; + for (uint32_t candidate_index = 0; + candidate_index < entry.candidates.size(); + ++candidate_index) { + auto &c = entry.candidates[candidate_index]; + for (auto &[validator, message_state] : c.messages) { + auto message_subject{ + std::make_tuple(block, candidate_index, validator)}; + const auto &[ref_cert, opt_ref_approval_sig] = visit_in_place( + message_state.approval_state, + [](const DistribApprovalStateAssigned &cert) + -> std::pair< + std::reference_wrapper, + std::optional< + std::reference_wrapper>> { + return std::make_pair(std::cref(cert), std::nullopt); + }, + [](const DistribApprovalStateApproved &val) + -> std::pair< + std::reference_wrapper, + std::optional< + std::reference_wrapper>> { + const auto &[cert, sig] = val; + return std::make_pair(std::cref(cert), std::cref(sig)); + }); + + if (!peer_knowledge.contains(message_subject, + approval::MessageKind::Assignment)) { + peer_knowledge.sent.insert(message_subject, + approval::MessageKind::Assignment); + + assignments_to_send.emplace_back(network::Assignment{ + .indirect_assignment_cert = + approval::IndirectAssignmentCert{ + .block_hash = block, + .validator = validator, + .cert = ref_cert.get(), + }, + .candidate_ix = candidate_index, + }); + } + + if (opt_ref_approval_sig) { + if (!peer_knowledge.contains(message_subject, + approval::MessageKind::Approval)) { + peer_knowledge.sent.insert(message_subject, + approval::MessageKind::Approval); + + approvals_to_send.emplace_back( + network::IndirectSignedApprovalVote{ + .payload = + { + .payload = + network::ApprovalVote{ + .block_hash = block, + .candidate_index = candidate_index, + }, + .ix = validator, + }, + .signature = opt_ref_approval_sig->get(), + }); + } + } + } + } + + block = entry.parent_hash; + } + } + + if (!assignments_to_send.empty()) { + SL_TRACE(logger_, + "Sending assignments to unified peer. (peer id={}, count={})", + peer_id, + assignments_to_send.size()); + send_assignments_batched(std::move(assignments_to_send), peer_id); + } + + if (!approvals_to_send.empty()) { + SL_TRACE(logger_, + "Sending approvals to unified peer. (peer id={}, count={})", + peer_id, + approvals_to_send.size()); + send_approvals_batched(std::move(approvals_to_send), peer_id); + } + } + primitives::BlockInfo ApprovalDistribution::approvedAncestor( const primitives::BlockInfo &min, const primitives::BlockInfo &max) const { diff --git a/core/parachain/approval/approval_distribution.hpp b/core/parachain/approval/approval_distribution.hpp index 025d11a63e..3e3bb15ee2 100644 --- a/core/parachain/approval/approval_distribution.hpp +++ b/core/parachain/approval/approval_distribution.hpp @@ -28,6 +28,7 @@ #include "network/peer_view.hpp" #include "network/types/collator_messages.hpp" #include "parachain/approval/approved_ancestor.hpp" +#include "parachain/approval/knowledge.hpp" #include "parachain/approval/store.hpp" #include "parachain/availability/recovery/recovery.hpp" #include "parachain/validator/parachain_processor.hpp" @@ -107,13 +108,13 @@ namespace kagome::parachain { ApprovalEntry( GroupIndex group_index, - std::optional> assignment, + std::optional> assignment, size_t assignments_size) : backing_group{group_index}, our_assignment{assignment}, approved(false) { assignments.bits.insert( - assignments.bits.begin(), assignments_size, false); + assignments.bits.end(), assignments_size, false); } /// Whether a validator is already assigned. @@ -208,7 +209,7 @@ namespace kagome::parachain { SessionIndex session_index, size_t approvals_size) : candidate(receipt), session(session_index) { - approvals.bits.insert(approvals.bits.begin(), approvals_size, false); + approvals.bits.insert(approvals.bits.end(), approvals_size, false); } std::optional> approval_entry( @@ -278,6 +279,19 @@ namespace kagome::parachain { /// AppStateManager impl bool prepare(); + using CandidateIncludedList = + std::vector>; + using AssignmentsList = std::unordered_map; + + static AssignmentsList compute_assignments( + const std::shared_ptr &keystore, + const runtime::SessionInfo &config, + const RelayVRFStory &relay_vrf_story, + const CandidateIncludedList &leaving_cores); + void onValidationProtocolMsg( const libp2p::peer::PeerId &peer_id, const network::ValidatorProtocolMessage &message); @@ -296,13 +310,6 @@ namespace kagome::parachain { const primitives::BlockInfo &max) const override; private: - using CandidateIncludedList = - std::vector>; - using AssignmentsList = std::unordered_map; - struct ImportedBlockInfo { CandidateIncludedList included_candidates; SessionIndex session_index; @@ -310,27 +317,24 @@ namespace kagome::parachain { size_t n_validators; RelayVRFStory relay_vrf_story; consensus::babe::BabeSlotNumber slot; - runtime::SessionInfo session_info; std::optional force_approve; }; struct ApprovingContext { primitives::BlockHeader block_header; std::optional included_candidates; - std::optional session_index; std::optional babe_block_header; std::optional babe_epoch; std::optional randomness; std::optional authorities; - std::optional session_info; std::shared_ptr complete_callback_context; std::function &&)> complete_callback; bool is_complete() const { - return included_candidates && babe_epoch && session_index - && session_info && babe_block_header && randomness && authorities; + return included_candidates && babe_epoch && babe_block_header + && randomness && authorities; } }; @@ -354,14 +358,24 @@ namespace kagome::parachain { /// for the same candidate, if it is included by multiple blocks - this is /// likely the case when there are forks. struct DistribCandidateEntry { - std::unordered_map messages; + std::unordered_map messages{}; }; /// Information about blocks in our current view as well as whether peers /// know of them. struct DistribBlockEntry { /// A votes entry for each candidate indexed by [`CandidateIndex`]. - std::vector candidates; + std::vector candidates{}; + /// Our knowledge of messages. + approval::Knowledge knowledge{}; + /// Peers who we know are aware of this block and thus, the candidates + /// within it. This maps to their knowledge of messages. + std::unordered_map + known_by{}; + /// The number of the block. + primitives::BlockNumber number; + /// The parent hash of the block. + RelayHash parent_hash; }; /// Metadata regarding approval of a particular block, by way of approval of @@ -371,7 +385,6 @@ namespace kagome::parachain { primitives::BlockHash parent_hash; primitives::BlockNumber block_number; SessionIndex session; - runtime::SessionInfo session_info; consensus::babe::BabeSlotNumber slot; RelayVRFStory relay_vrf_story; // The candidates included as-of this block and the index of the core they @@ -420,11 +433,12 @@ namespace kagome::parachain { /// Information about a block and imported candidates. struct BlockImportedCandidates { - primitives::BlockHash block_hash; - primitives::BlockNumber block_number; - network::Tick block_tick; - network::Tick no_show_duration; - std::vector> imported_candidates; + primitives::BlockHash block_hash{}; + primitives::BlockNumber block_number{}; + network::Tick block_tick{}; + network::Tick no_show_duration{}; + std::vector> + imported_candidates{}; }; using AssignmentOrApproval = @@ -476,12 +490,6 @@ namespace kagome::parachain { primitives::AuthorityList, primitives::Randomness>>; - AssignmentsList compute_assignments( - const std::shared_ptr &keystore, - const runtime::SessionInfo &config, - const RelayVRFStory &relay_vrf_story, - const CandidateIncludedList &leaving_cores); - void imported_block_info(const primitives::BlockHash &block_hash, const primitives::BlockHeader &block_header); @@ -529,16 +537,24 @@ namespace kagome::parachain { template void for_ACU(const primitives::BlockHash &block_hash, Func &&func); - void try_process_approving_context(ApprovingContextUnit &acu); + void try_process_approving_context( + ApprovingContextUnit &acu, + SessionIndex session_index, + const runtime::SessionInfo &session_info); - std::optional> + static std::optional> findAssignmentKey(const std::shared_ptr &keystore, const runtime::SessionInfo &config); + void unify_with_peer(StoreUnit> &entries, + const libp2p::peer::PeerId &peer_id, + const network::View &view); + outcome::result processImportedBlock( primitives::BlockNumber block_number, const primitives::BlockHash &block_hash, const primitives::BlockHash &parent_hash, + primitives::BlockNumber finalized_block_number, ImportedBlockInfo &&block_info); outcome::result>> @@ -546,7 +562,7 @@ namespace kagome::parachain { const primitives::BlockHash &block_hash, const primitives::BlockHash &parent_hash, scale::BitVec &&approved_bitfield, - ImportedBlockInfo &&block_info); + const ImportedBlockInfo &block_info); void on_active_leaves_update(const network::ExView &updated); @@ -576,7 +592,8 @@ namespace kagome::parachain { const network::CandidateReceipt &candidate, GroupIndex backing_group); - void runNewBlocks(approval::BlockApprovalMeta &&approval_meta); + void runNewBlocks(approval::BlockApprovalMeta &&approval_meta, + primitives::BlockNumber finalized_block_number); std::optional sign_approval( const crypto::Sr25519PublicKey &pubkey, @@ -607,9 +624,19 @@ namespace kagome::parachain { void runDistributeAssignment( const approval::IndirectAssignmentCert &indirect_cert, - CandidateIndex candidate_index); + CandidateIndex candidate_index, + std::unordered_set &&peers); + + void send_assignments_batched(std::deque &&assignments, + const libp2p::peer::PeerId &peer_id); - void runDistributeApproval(const network::IndirectSignedApprovalVote &vote); + void send_approvals_batched( + std::deque &&approvals, + const libp2p::peer::PeerId &peer_id); + + void runDistributeApproval( + const network::IndirectSignedApprovalVote &vote, + std::unordered_set &&peers); void runScheduleWakeup(const primitives::BlockHash &block_hash, primitives::BlockNumber block_number, @@ -618,6 +645,9 @@ namespace kagome::parachain { void clearCaches(const primitives::events::ChainEventParams &event); + void store_remote_view(const libp2p::peer::PeerId &peer_id, + const network::View &view); + auto &storedBlocks() { return as>>(store_); @@ -653,6 +683,7 @@ namespace kagome::parachain { const ApprovalVotingSubsystem config_; std::shared_ptr peer_view_; network::PeerView::MyViewSubscriberPtr my_view_sub_; + network::PeerView::PeerViewSubscriberPtr remote_view_sub_; std::shared_ptr chain_sub_; Store>, @@ -676,14 +707,24 @@ namespace kagome::parachain { Hash, std::vector>> pending_known_; + std::unordered_map peer_views_; + std::map> + blocks_by_number_; /// thread_pool_ context access - using ScheduledCandidateTimer = - std::unordered_map>>; + using ScheduledCandidateTimer = std::unordered_map< + CandidateHash, + std::vector>>>; std::unordered_map active_tranches_; + struct ApprovalCache { + std::unordered_set blocks_; + ApprovalOutcome approval_result; + }; + SafeObject, std::mutex> + approvals_cache_; + log::Logger logger_ = log::createLogger("ApprovalDistribution", "parachain"); }; diff --git a/core/parachain/approval/knowledge.hpp b/core/parachain/approval/knowledge.hpp new file mode 100644 index 0000000000..0bfe2cfed2 --- /dev/null +++ b/core/parachain/approval/knowledge.hpp @@ -0,0 +1,80 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef KAGOME_KNOWLEDGE_HPP +#define KAGOME_KNOWLEDGE_HPP + +#include + +#include "common/visitor.hpp" +#include "consensus/babe/common.hpp" +#include "consensus/validation/prepare_transcript.hpp" +#include "outcome/outcome.hpp" +#include "parachain/approval/state.hpp" +#include "parachain/types.hpp" + +namespace kagome::parachain::approval { + + enum struct MessageKind { Assignment, Approval }; + using MessageSubject = std::tuple; + + struct MessageSubjectHash { + auto operator()(const MessageSubject &obj) const { + size_t value{0ull}; + std::apply( + [&](const auto &...v) { (..., boost::hash_combine(value, v)); }, obj); + return value; + } + }; + + struct Knowledge { + // When there is no entry, this means the message is unknown + // When there is an entry with `MessageKind::Assignment`, the assignment is + // known. When there is an entry with `MessageKind::Approval`, the + // assignment and approval are known. + std::unordered_map + known_messages{}; + + bool contains(const MessageSubject &message, + const MessageKind &kind) const { + auto it = known_messages.find(message); + if (it == known_messages.end()) { + return false; + } + if (MessageKind::Assignment == kind) { + return true; + } + return MessageKind::Approval == it->second; + } + + bool insert(const MessageSubject &message, const MessageKind &kind) { + const auto &[it, inserted] = known_messages.emplace(message, kind); + if (inserted) { + return true; + } + if (MessageKind::Assignment == it->second + && MessageKind::Approval == kind) { + it->second = MessageKind::Approval; + return true; + } + return false; + } + }; + + struct PeerKnowledge { + /// The knowledge we've sent to the peer. + Knowledge sent{}; + /// The knowledge we've received from the peer. + Knowledge received{}; + + bool contains(const MessageSubject &message, + const MessageKind &kind) const { + return sent.contains(message, kind) || received.contains(message, kind); + } + }; + +} // namespace kagome::parachain::approval + +#endif // KAGOME_KNOWLEDGE_HPP diff --git a/test/core/CMakeLists.txt b/test/core/CMakeLists.txt index c24182ea26..5d0da9502a 100644 --- a/test/core/CMakeLists.txt +++ b/test/core/CMakeLists.txt @@ -24,3 +24,4 @@ add_subdirectory(subscription) add_subdirectory(telemetry) add_subdirectory(transaction_pool) add_subdirectory(dispute_coordinator) +add_subdirectory(parachain) diff --git a/test/core/parachain/CMakeLists.txt b/test/core/parachain/CMakeLists.txt new file mode 100644 index 0000000000..37c23ba2d3 --- /dev/null +++ b/test/core/parachain/CMakeLists.txt @@ -0,0 +1,13 @@ +# +# Copyright Soramitsu Co., Ltd. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# + +addtest(assignments_test + assignments.cpp + ) +target_link_libraries(assignments_test + crypto_store + base_fs_test + validator_parachain + ) diff --git a/test/core/parachain/assignments.cpp b/test/core/parachain/assignments.cpp new file mode 100644 index 0000000000..d6e7cbe765 --- /dev/null +++ b/test/core/parachain/assignments.cpp @@ -0,0 +1,273 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "crypto/crypto_store/crypto_store_impl.hpp" + +#include + +#include "crypto/bip39/impl/bip39_provider_impl.hpp" +#include "crypto/ecdsa/ecdsa_provider_impl.hpp" +#include "crypto/ed25519/ed25519_provider_impl.hpp" +#include "crypto/hasher/hasher_impl.hpp" +#include "crypto/pbkdf2/impl/pbkdf2_provider_impl.hpp" +#include "crypto/random_generator/boost_generator.hpp" +#include "crypto/sr25519/sr25519_provider_impl.hpp" + +#include +#include +#include "testutil/outcome.hpp" +#include "testutil/prepare_loggers.hpp" +#include "testutil/storage/base_fs_test.hpp" + +#include "common/blob.hpp" +#include "common/visitor.hpp" +#include "parachain/approval/approval_distribution.hpp" + +using kagome::common::Blob; +using kagome::common::Buffer; +using namespace kagome::crypto; + +static CryptoStoreImpl::Path assignments_directory = + kagome::filesystem::temp_directory_path() / "assignments_test"; + +struct AssignmentsTest : public test::BaseFS_Test { + static void SetUpTestCase() { + testutil::prepareLoggers(); + } + + AssignmentsTest() : BaseFS_Test(assignments_directory) {} + + void SetUp() override {} + + template + auto assignment_keys_plus_random(std::shared_ptr &cs, + const char *const (&accounts)[N], + size_t random) { + for (const auto &acc : accounts) { + [[maybe_unused]] auto _ = + cs->generateSr25519Keypair(KnownKeyTypeId::KEY_TYPE_ASGN, + std::string_view{acc}) + .value(); + } + for (size_t ix = 0ull; ix < random; ++ix) { + auto seed = std::to_string(ix); + [[maybe_unused]] auto _ = + cs->generateSr25519Keypair(KnownKeyTypeId::KEY_TYPE_ASGN, + std::string_view{seed}) + .value(); + } + return cs->getSr25519PublicKeys(KnownKeyTypeId::KEY_TYPE_ASGN).value(); + } + + auto create_crypto_store() { + auto hasher = std::make_shared(); + auto csprng = std::make_shared(); + auto ecdsa_provider = std::make_shared(hasher); + auto ed25519_provider = std::make_shared(hasher); + auto sr25519_provider = std::make_shared(); + + auto pbkdf2_provider = std::make_shared(); + auto bip39_provider = + std::make_shared(std::move(pbkdf2_provider), hasher); + + auto keystore_path = kagome::filesystem::path(__FILE__).parent_path() + / "subkey_keys" / "keystore"; + + return std::make_shared( + std::make_shared(std::move(ecdsa_provider)), + std::make_shared(std::move(ed25519_provider)), + std::make_shared(std::move(sr25519_provider)), + bip39_provider, + csprng, + kagome::crypto::KeyFileStorage::createAt(keystore_path).value()); + } +}; + +/** + * There should be no assignments in no cores available. + */ +TEST_F(AssignmentsTest, succeeds_empty_for_0_cores) { + auto cs = create_crypto_store(); + auto asgn_keys = + assignment_keys_plus_random(cs, {"//Alice", "//Bob", "//Charlie"}, 0ull); + + ::RelayVRFStory vrf_story; + ::memset(vrf_story.data, 42, sizeof(vrf_story.data)); + + kagome::runtime::SessionInfo si; + for (const auto &a : asgn_keys) { + si.assignment_keys.emplace_back(a); + } + + si.n_cores = 0; + si.zeroth_delay_tranche_width = 10; + si.relay_vrf_modulo_samples = 3; + si.n_delay_tranches = 40; + + kagome::parachain::ApprovalDistribution::CandidateIncludedList + leaving_cores{}; + + auto assignments = + kagome::parachain::ApprovalDistribution::compute_assignments( + cs, si, vrf_story, leaving_cores); + + ASSERT_EQ(assignments.size(), 0ull); +} + +/** + * There should be an assignment for a 1 core. + */ +TEST_F(AssignmentsTest, assign_to_nonzero_core) { + auto cs = create_crypto_store(); + auto asgn_keys = + assignment_keys_plus_random(cs, {"//Alice", "//Bob", "//Charlie"}, 0ull); + + auto c_a = + kagome::common::Hash256::fromHexWithPrefix( + "0x0000000000000000000000000000000000000000000000000000000000000000") + .value(); + auto c_b = + kagome::common::Hash256::fromHexWithPrefix( + "0x0101010101010101010101010101010101010101010101010101010101010101") + .value(); + + ::RelayVRFStory vrf_story; + ::memset(vrf_story.data, 42, sizeof(vrf_story.data)); + + kagome::runtime::SessionInfo si; + for (const auto &a : asgn_keys) { + si.assignment_keys.emplace_back(a); + } + + si.validator_groups.emplace_back( + std::vector{0}); + si.validator_groups.emplace_back( + std::vector{1, 2}); + si.n_cores = 2; + si.zeroth_delay_tranche_width = 10; + si.relay_vrf_modulo_samples = 3; + si.n_delay_tranches = 40; + + kagome::parachain::ApprovalDistribution::CandidateIncludedList leaving_cores = + {std::make_tuple(c_a, + kagome::network::CandidateReceipt{}, + (kagome::parachain::CoreIndex)0, + (kagome::parachain::GroupIndex)0), + std::make_tuple(c_b, + kagome::network::CandidateReceipt{}, + (kagome::parachain::CoreIndex)1, + (kagome::parachain::GroupIndex)1)}; + auto assignments = + kagome::parachain::ApprovalDistribution::compute_assignments( + cs, si, vrf_story, leaving_cores); + + ASSERT_EQ(assignments.size(), 1ull); + + auto it = assignments.find(1ull); + ASSERT_TRUE(it != assignments.end()); + + const kagome::parachain::ApprovalDistribution::OurAssignment &our_assignment = + it->second; + ASSERT_EQ(our_assignment.tranche, 0ull); + ASSERT_EQ(our_assignment.validator_index, 0); + ASSERT_EQ(our_assignment.triggered, false); + + ASSERT_TRUE( + kagome::is_type( + our_assignment.cert.kind)); + if (auto k = + kagome::if_type( + our_assignment.cert.kind)) { + ASSERT_EQ(k->get().sample, 2); + } + + const uint8_t + vrf_output[kagome::crypto::constants::sr25519::vrf::OUTPUT_SIZE] = { + 228, 179, 248, 78, 77, 169, 23, 184, 138, 204, 148, + 183, 13, 41, 176, 163, 162, 6, 237, 158, 220, 225, + 97, 251, 51, 144, 207, 239, 189, 2, 7, 66}; + ASSERT_EQ(0, + memcmp(&our_assignment.cert.vrf.output, + vrf_output, + kagome::crypto::constants::sr25519::vrf::OUTPUT_SIZE)); +} + +/** + * There should be an assignment for a 1 core. + */ +TEST_F(AssignmentsTest, assignments_produced_for_non_backing) { + auto cs = create_crypto_store(); + auto asgn_keys = + assignment_keys_plus_random(cs, {"//Alice", "//Bob", "//Charlie"}, 0ull); + + auto c_a = + kagome::common::Hash256::fromHexWithPrefix( + "0x0000000000000000000000000000000000000000000000000000000000000000") + .value(); + auto c_b = + kagome::common::Hash256::fromHexWithPrefix( + "0x0101010101010101010101010101010101010101010101010101010101010101") + .value(); + + ::RelayVRFStory vrf_story; + ::memset(vrf_story.data, 42, sizeof(vrf_story.data)); + + kagome::runtime::SessionInfo si; + for (const auto &a : asgn_keys) { + si.assignment_keys.emplace_back(a); + } + + si.validator_groups.emplace_back( + std::vector{0}); + si.validator_groups.emplace_back( + std::vector{1, 2}); + si.n_cores = 2; + si.zeroth_delay_tranche_width = 10; + si.relay_vrf_modulo_samples = 3; + si.n_delay_tranches = 40; + + kagome::parachain::ApprovalDistribution::CandidateIncludedList leaving_cores = + {std::make_tuple(c_a, + kagome::network::CandidateReceipt{}, + (kagome::parachain::CoreIndex)0, + (kagome::parachain::GroupIndex)1), + std::make_tuple(c_b, + kagome::network::CandidateReceipt{}, + (kagome::parachain::CoreIndex)1, + (kagome::parachain::GroupIndex)0)}; + auto assignments = + kagome::parachain::ApprovalDistribution::compute_assignments( + cs, si, vrf_story, leaving_cores); + + ASSERT_EQ(assignments.size(), 1ull); + + auto it = assignments.find(0ull); + ASSERT_TRUE(it != assignments.end()); + + const kagome::parachain::ApprovalDistribution::OurAssignment &our_assignment = + it->second; + ASSERT_EQ(our_assignment.tranche, 0ull); + ASSERT_EQ(our_assignment.validator_index, 0); + ASSERT_EQ(our_assignment.triggered, false); + + ASSERT_TRUE( + kagome::is_type( + our_assignment.cert.kind)); + if (auto k = + kagome::if_type( + our_assignment.cert.kind)) { + ASSERT_EQ(k->get().sample, 0); + } + + const uint8_t + vrf_output[kagome::crypto::constants::sr25519::vrf::OUTPUT_SIZE] = { + 34, 247, 30, 171, 146, 67, 68, 83, 108, 206, 61, + 154, 115, 28, 180, 81, 28, 90, 68, 166, 49, 220, + 157, 41, 235, 223, 152, 45, 190, 202, 216, 39}; + ASSERT_EQ(0, + memcmp(&our_assignment.cert.vrf.output, + vrf_output, + kagome::crypto::constants::sr25519::vrf::OUTPUT_SIZE)); +} \ No newline at end of file