Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suggestions for "fix(mining): Advertise mined blocks" #9183

Open
wants to merge 4 commits into
base: advertise-mined-blocks
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions zebra-rpc/src/methods/get_block_template_rpcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ use crate::{
hex_data::HexData,
GetBlockHash,
},
server::{self, error::MapError},
server::{
self,
error::{MapError, OkOrError},
},
};

pub mod constants;
Expand Down Expand Up @@ -377,7 +380,8 @@ pub struct GetBlockTemplateRpcImpl<
/// Address book of peers, used for `getpeerinfo`.
address_book: AddressBook,

/// The sender part of a Channel to avertise mined blocks by this node to the network.
/// A channel to send successful block submissions to the block gossip task,
/// so they can be advertised to peers.
mined_block_sender: watch::Sender<(block::Hash, block::Height)>,
}

Expand Down Expand Up @@ -942,11 +946,9 @@ where
}
};

let block_height = block.coinbase_height().ok_or(ErrorObject::owned(
0,
"coinbase height not found".to_string(),
None::<()>,
))?;
let block_height = block
.coinbase_height()
.ok_or_error(0, "coinbase height not found")?;
let block_hash = block.hash();

let block_verifier_router_response = block_verifier_router
Expand All @@ -968,13 +970,7 @@ where

self.mined_block_sender
.send((block_hash, block_height))
.map_err(|e| {
ErrorObject::owned(
0,
format!("failed to send mined block: {e}"),
None::<()>,
)
})?;
.map_error_with_prefix(0, "failed to send mined block")?;

return Ok(submit_block::Response::Accepted);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,7 @@ where
Tip: ChainTip + Clone + Send + Sync + 'static,
SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
{
// TODO:
// - Add a `disable_peers` field to `Network` to check instead of `disable_pow()` (#8361)
// - Check the field in `sync_status` so it applies to the mempool as well.
if network.disable_pow() {
if network.is_a_test_network() {
return Ok(());
}

Expand Down
23 changes: 23 additions & 0 deletions zebra-rpc/src/server/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ pub(crate) trait MapError<T>: Sized {
/// Maps errors to [`jsonrpsee_types::ErrorObjectOwned`] with a specific error code.
fn map_error(self, code: impl Into<ErrorCode>) -> std::result::Result<T, ErrorObjectOwned>;

/// Maps errors to [`jsonrpsee_types::ErrorObjectOwned`] with a prefixed message and a specific error code.
#[cfg(feature = "getblocktemplate-rpcs")]
fn map_error_with_prefix(
self,
code: impl Into<ErrorCode>,
msg_prefix: impl ToString,
) -> Result<T, ErrorObjectOwned>;

/// Maps errors to [`jsonrpsee_types::ErrorObjectOwned`] with a [`LegacyCode::Misc`] error code.
fn map_misc_error(self) -> std::result::Result<T, ErrorObjectOwned> {
self.map_error(LegacyCode::Misc)
Expand Down Expand Up @@ -98,6 +106,21 @@ where
fn map_error(self, code: impl Into<ErrorCode>) -> Result<T, ErrorObjectOwned> {
self.map_err(|error| ErrorObject::owned(code.into().code(), error.to_string(), None::<()>))
}

#[cfg(feature = "getblocktemplate-rpcs")]
fn map_error_with_prefix(
self,
code: impl Into<ErrorCode>,
msg_prefix: impl ToString,
) -> Result<T, ErrorObjectOwned> {
self.map_err(|error| {
ErrorObject::owned(
code.into().code(),
format!("{}: {}", msg_prefix.to_string(), error.to_string()),
None::<()>,
)
})
}
}

impl<T> OkOrError<T> for Option<T> {
Expand Down
23 changes: 13 additions & 10 deletions zebrad/src/components/inbound/tests/fake_peer_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{collections::HashSet, iter, net::SocketAddr, str::FromStr, sync::Arc,
use futures::FutureExt;
use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
use tower::{buffer::Buffer, builder::ServiceBuilder, util::BoxService, Service, ServiceExt};
use tracing::Span;
use tracing::{Instrument, Span};

use zebra_chain::{
amount::Amount,
Expand Down Expand Up @@ -978,15 +978,18 @@ async fn setup(

#[cfg(feature = "getblocktemplate-rpcs")]
let submitblock_channel = SubmitBlockChannel::new();
let sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes(
sync_status.clone(),
chain_tip_change.clone(),
peer_set.clone(),
#[cfg(feature = "getblocktemplate-rpcs")]
Some(submitblock_channel.receiver()),
#[cfg(not(feature = "getblocktemplate-rpcs"))]
None,
));
let sync_gossip_task_handle = tokio::spawn(
sync::gossip_best_tip_block_hashes(
sync_status.clone(),
chain_tip_change.clone(),
peer_set.clone(),
#[cfg(feature = "getblocktemplate-rpcs")]
Some(submitblock_channel.receiver()),
#[cfg(not(feature = "getblocktemplate-rpcs"))]
None,
)
.in_current_span(),
);

let tx_gossip_task_handle = tokio::spawn(gossip_mempool_transaction_id(
transaction_receiver,
Expand Down
17 changes: 10 additions & 7 deletions zebrad/src/components/inbound/tests/real_peer_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ async fn setup(
mod submitblock_test {
use std::io;
use std::sync::{Arc, Mutex};
use tracing::Level;
use tracing::{Instrument, Level};
use tracing_subscriber::fmt;

use super::*;
Expand Down Expand Up @@ -872,12 +872,15 @@ mod submitblock_test {

// Start the block gossip task with a SubmitBlockChannel
let submitblock_channel = SubmitBlockChannel::new();
let gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes(
sync_status.clone(),
chain_tip_change,
peer_set.clone(),
Some(submitblock_channel.receiver()),
));
let gossip_task_handle = tokio::spawn(
sync::gossip_best_tip_block_hashes(
sync_status.clone(),
chain_tip_change,
peer_set.clone(),
Some(submitblock_channel.receiver()),
)
.in_current_span(),
);

// Send a block top the channel
submitblock_channel
Expand Down
78 changes: 43 additions & 35 deletions zebrad/src/components/sync/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
//!
//! [`block::Hash`]: zebra_chain::block::Hash

use futures::TryFutureExt;
use thiserror::Error;
use tokio::sync::watch;
use tower::{timeout::Timeout, Service, ServiceExt};
use tracing::Instrument;

use zebra_chain::{block, chain_sync_status::ChainSyncStatus};
use zebra_chain::block;
use zebra_network as zn;
use zebra_state::ChainTipChange;

Expand Down Expand Up @@ -44,10 +46,10 @@ pub enum BlockGossipError {
///
/// [`block::Hash`]: zebra_chain::block::Hash
pub async fn gossip_best_tip_block_hashes<ZN>(
mut sync_status: SyncStatus,
sync_status: SyncStatus,
mut chain_state: ChainTipChange,
broadcast_network: ZN,
mut receiver: Option<watch::Receiver<(block::Hash, block::Height)>>,
mut mined_block_receiver: Option<watch::Receiver<(block::Hash, block::Height)>>,
) -> Result<(), BlockGossipError>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
Expand All @@ -60,50 +62,56 @@ where
let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT);

loop {
// build a request if the sync status is close to the tip, or if we receive a block in the submitblock channel.
let request = if sync_status.is_close_to_tip() {
let mut sync_status = sync_status.clone();
let mut chain_tip = chain_state.clone();
let tip_change_close_to_network_tip_fut = async move {
// wait for at least one tip change, to make sure we have a new block hash to broadcast
let tip_action = chain_state.wait_for_tip_change().await.map_err(TipChange)?;
let tip_action = chain_tip.wait_for_tip_change().await.map_err(TipChange)?;

// wait until we're close to the tip, because broadcasts are only useful for nodes near the tip
// (if they're a long way from the tip, they use the syncer and block locators)
// (if they're a long way from the tip, they use the syncer and block locators), unless a mined block
// hash is received before `wait_until_close_to_tip()` is ready.
sync_status
.wait_until_close_to_tip()
.await
.map_err(SyncStatus)?;
.map_err(SyncStatus)
.await?;

// get the latest tip change - it might be different to the change we awaited,
// get the latest tip change when close to tip - it might be different to the change we awaited,
// because the syncer might take a long time to reach the tip
let tip_action = chain_state.last_tip_change().unwrap_or(tip_action);

// block broadcasts inform other nodes about new blocks,
// so our internal Grow or Reset state doesn't matter to them
let request = zn::Request::AdvertiseBlock(tip_action.best_tip_hash());

let height = tip_action.best_tip_height();
debug!(?height, ?request, "sending committed block broadcast");
request
} else if receiver.is_some()
&& receiver
.clone()
.unwrap()
.has_changed()
.map_err(SyncStatus)?
{
// we have a new block to broadcast from the `submitblock `RPC method, get block data and release the channel.
let (hash, height) = *receiver.as_mut().unwrap().borrow_and_update();
let best_tip = chain_tip
.last_tip_change()
.unwrap_or(tip_action)
.best_tip_hash_and_height();

// build a request with the obtained hash.
let request = zn::Request::AdvertiseBlock(hash);
Ok((best_tip, "sending committed block broadcast", chain_tip))
}
.in_current_span();

info!(?height, ?request, "sending mined block broadcast");
request
let ((hash, height), log_msg, updated_chain_state) = if let Some(mined_block_receiver) =
mined_block_receiver.as_mut()
{
tokio::select! {
tip_change_close_to_network_tip = tip_change_close_to_network_tip_fut => {
mined_block_receiver.mark_unchanged();
tip_change_close_to_network_tip?
},

Ok(_) = mined_block_receiver.changed() => {
// we have a new block to broadcast from the `submitblock `RPC method, get block data and release the channel.
(*mined_block_receiver.borrow_and_update(), "sending mined block broadcast", chain_state)
}
}
} else {
// we don't have a new block to broadcast, so wait for a new one.
tokio::time::sleep(PEER_GOSSIP_DELAY).await;
continue;
tip_change_close_to_network_tip_fut.await?
};

chain_state = updated_chain_state;

// block broadcasts inform other nodes about new blocks,
// so our internal Grow or Reset state doesn't matter to them
let request = zn::Request::AdvertiseBlock(hash);

info!(?height, ?request, log_msg);
// broadcast requests don't return errors, and we'd just want to ignore them anyway
let _ = broadcast_network
.ready()
Expand Down
Loading