diff --git a/cw-orch-daemon/Cargo.toml b/cw-orch-daemon/Cargo.toml index 2af2f847..0db76087 100644 --- a/cw-orch-daemon/Cargo.toml +++ b/cw-orch-daemon/Cargo.toml @@ -78,6 +78,14 @@ uid = "0.1.7" toml = "0.8" http = "1.1.0" libc-print = "0.1.23" +tower = { version = "0.5.1", features = ["reconnect"] } +pin-project-lite = "0.2.15" +futures-core = "0.3.31" +futures = "0.3.31" +futures-util = "0.3.31" +hyper = "1.5.0" +http-body-util = "0.1.2" +http-body = "1.0.1" [dev-dependencies] cw-orch-daemon = { path = "." } diff --git a/cw-orch-daemon/examples/manual_sender.rs b/cw-orch-daemon/examples/manual_sender.rs index 9c9b4fdc..5643b60d 100644 --- a/cw-orch-daemon/examples/manual_sender.rs +++ b/cw-orch-daemon/examples/manual_sender.rs @@ -18,10 +18,10 @@ use cosmrs::{AccountId, Any}; use cosmwasm_std::Addr; use cw_orch::prelude::*; use cw_orch_core::environment::ChainInfoOwned; +use cw_orch_daemon::Channel; use prost::Message; use std::io::{self, Write}; use std::sync::Arc; -use tonic::transport::Channel; // ANCHOR: full_counter_example use counter_contract::CounterContract; @@ -85,7 +85,7 @@ impl QuerySender for ManualSender { type Error = DaemonError; type Options = ManualSenderOptions; - fn channel(&self) -> tonic::transport::Channel { + fn channel(&self) -> Channel { self.grpc_channel.clone() } } diff --git a/cw-orch-daemon/examples/querier-daemon.rs b/cw-orch-daemon/examples/querier-daemon.rs index 54d85dc0..cb068b33 100644 --- a/cw-orch-daemon/examples/querier-daemon.rs +++ b/cw-orch-daemon/examples/querier-daemon.rs @@ -1,5 +1,7 @@ // ANCHOR: full_counter_example +use std::{thread::sleep, time::Duration}; + use cw_orch::{anyhow, prelude::*}; use cw_orch_daemon::senders::QueryOnlyDaemon; @@ -18,5 +20,12 @@ pub fn main() -> anyhow::Result<()> { .balance(&Addr::unchecked(LOCAL_JUNO_SENDER), None)?; assert!(!balances.is_empty()); + sleep(Duration::from_secs(10)); + log::info!("Resuming queries"); + let balances = chain + .bank_querier() + .balance(&Addr::unchecked(LOCAL_JUNO_SENDER), None)?; + assert!(!balances.is_empty()); + Ok(()) } diff --git a/cw-orch-daemon/src/channel.rs b/cw-orch-daemon/src/channel.rs index 6f58ac46..c1b2b7ab 100644 --- a/cw-orch-daemon/src/channel.rs +++ b/cw-orch-daemon/src/channel.rs @@ -3,16 +3,22 @@ use cosmrs::proto::cosmos::base::tendermint::v1beta1::{ }; use cw_orch_core::{environment::ChainInfoOwned, log::connectivity_target}; use http::Uri; -use tonic::transport::{Channel, ClientTlsConfig, Endpoint}; +use tonic::transport::{ClientTlsConfig, Endpoint}; +use tower::ServiceBuilder; use super::error::DaemonError; +use crate::service::reconnect::{ChannelCreationArgs, ChannelFactory, Reconnect}; +use crate::service::retry::{Retry, RetryAttemps, RetryLayer}; /// A helper for constructing a gRPC channel pub struct GrpcChannel {} +pub type Channel = Reconnect; +pub type TowerChannel = Retry; + impl GrpcChannel { /// Connect to any of the provided gRPC endpoints - pub async fn connect(grpc: &[String], chain_id: &str) -> Result { + pub async fn get_channel(grpc: &[String], chain_id: &str) -> Result { if grpc.is_empty() { return Err(DaemonError::GRPCListIsEmpty); } @@ -66,13 +72,35 @@ impl GrpcChannel { return Err(DaemonError::CannotConnectGRPC); } - Ok(successful_connections.pop().unwrap()) + let retry_policy = RetryAttemps::count(3); + let retry_layer = RetryLayer::new(retry_policy); + + let service = ServiceBuilder::new() + .layer(retry_layer) + .service(successful_connections.pop().unwrap()); + + Ok(service) + } + + pub async fn connect(grpc: &[String], chain_id: &str) -> Result { + let target = (grpc.to_vec(), chain_id.to_string()); + let channel = Reconnect::new(ChannelFactory {}, target).with_attemps(3); + Self::verify_connection(channel.clone()).await?; + Ok(channel) } /// Create a gRPC channel from the chain info pub async fn from_chain_info(chain_info: &ChainInfoOwned) -> Result { GrpcChannel::connect(&chain_info.grpc_urls, &chain_info.chain_id).await } + + async fn verify_connection(channel: Channel) -> Result<(), DaemonError> { + let mut client = ServiceClient::new(channel.clone()); + + // Verify that we're able to query the node info + client.get_node_info(GetNodeInfoRequest {}).await?; + Ok(()) + } } #[cfg(test)] diff --git a/cw-orch-daemon/src/core.rs b/cw-orch-daemon/src/core.rs index f3b7f5fd..4e43f47f 100644 --- a/cw-orch-daemon/src/core.rs +++ b/cw-orch-daemon/src/core.rs @@ -1,6 +1,7 @@ use super::{ cosmos_modules, error::DaemonError, queriers::Node, senders::Wallet, tx_resp::CosmTxResponse, }; +use crate::Channel; use crate::{ queriers::CosmWasm, senders::{builder::SenderBuilder, query::QuerySender, tx::TxSender}, @@ -31,7 +32,6 @@ use std::{ str::{from_utf8, FromStr}, time::Duration, }; -use tonic::transport::Channel; pub const INSTANTIATE_2_TYPE_URL: &str = "/cosmwasm.wasm.v1.MsgInstantiateContract2"; diff --git a/cw-orch-daemon/src/lib.rs b/cw-orch-daemon/src/lib.rs index 18f529ea..a64e2da7 100644 --- a/cw-orch-daemon/src/lib.rs +++ b/cw-orch-daemon/src/lib.rs @@ -12,6 +12,7 @@ pub mod keys; pub mod live_mock; pub mod queriers; pub mod senders; +pub mod service; pub mod tx_broadcaster; pub mod tx_builder; diff --git a/cw-orch-daemon/src/live_mock.rs b/cw-orch-daemon/src/live_mock.rs index 98aae87c..3aef8010 100644 --- a/cw-orch-daemon/src/live_mock.rs +++ b/cw-orch-daemon/src/live_mock.rs @@ -4,6 +4,7 @@ use crate::queriers::Bank; use crate::queriers::CosmWasm; use crate::queriers::Staking; +use crate::Channel; use crate::RUNTIME; use cosmwasm_std::testing::{MockApi, MockStorage}; use cosmwasm_std::Addr; @@ -24,7 +25,6 @@ use cw_orch_core::environment::ChainInfoOwned; use cw_orch_core::environment::WasmQuerier; use std::marker::PhantomData; use std::str::FromStr; -use tonic::transport::Channel; use crate::channel::GrpcChannel; diff --git a/cw-orch-daemon/src/queriers/authz.rs b/cw-orch-daemon/src/queriers/authz.rs index 14766f87..72e0d2d7 100644 --- a/cw-orch-daemon/src/queriers/authz.rs +++ b/cw-orch-daemon/src/queriers/authz.rs @@ -1,9 +1,9 @@ +use crate::Channel; use crate::{cosmos_modules, error::DaemonError, Daemon}; use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; use cosmwasm_std::Addr; use cw_orch_core::environment::{Querier, QuerierGetter}; use tokio::runtime::Handle; -use tonic::transport::Channel; /// Queries for Cosmos AuthZ Module /// All the async function are prefixed with `_` diff --git a/cw-orch-daemon/src/queriers/bank.rs b/cw-orch-daemon/src/queriers/bank.rs index ac7fa383..2fc19b81 100644 --- a/cw-orch-daemon/src/queriers/bank.rs +++ b/cw-orch-daemon/src/queriers/bank.rs @@ -1,9 +1,9 @@ +use crate::Channel; use crate::{cosmos_modules, error::DaemonError, senders::query::QuerySender, DaemonBase}; use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; use cosmwasm_std::{Addr, Coin, StdError}; use cw_orch_core::environment::{BankQuerier, Querier, QuerierGetter}; use tokio::runtime::Handle; -use tonic::transport::Channel; /// Queries for Cosmos Bank Module /// All the async function are prefixed with `_` diff --git a/cw-orch-daemon/src/queriers/cosmwasm.rs b/cw-orch-daemon/src/queriers/cosmwasm.rs index 6fa942e9..d122532d 100644 --- a/cw-orch-daemon/src/queriers/cosmwasm.rs +++ b/cw-orch-daemon/src/queriers/cosmwasm.rs @@ -2,6 +2,7 @@ use std::{marker::PhantomData, str::FromStr}; use crate::senders::query::QuerySender; use crate::senders::QueryOnlySender; +use crate::Channel; use crate::{cosmos_modules, error::DaemonError, DaemonBase}; use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; use cosmrs::AccountId; @@ -15,7 +16,6 @@ use cw_orch_core::{ environment::{Querier, QuerierGetter, WasmQuerier}, }; use tokio::runtime::Handle; -use tonic::transport::Channel; /// Querier for the CosmWasm SDK module /// All the async function are prefixed with `_` diff --git a/cw-orch-daemon/src/queriers/feegrant.rs b/cw-orch-daemon/src/queriers/feegrant.rs index a899f159..3a99eeaa 100644 --- a/cw-orch-daemon/src/queriers/feegrant.rs +++ b/cw-orch-daemon/src/queriers/feegrant.rs @@ -1,9 +1,9 @@ +use crate::Channel; use crate::{cosmos_modules, error::DaemonError, Daemon}; use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; use cosmwasm_std::Addr; use cw_orch_core::environment::{Querier, QuerierGetter}; use tokio::runtime::Handle; -use tonic::transport::Channel; /// Querier for the Cosmos Gov module /// All the async function are prefixed with `_` diff --git a/cw-orch-daemon/src/queriers/gov.rs b/cw-orch-daemon/src/queriers/gov.rs index 18cfcb91..ce8ec118 100644 --- a/cw-orch-daemon/src/queriers/gov.rs +++ b/cw-orch-daemon/src/queriers/gov.rs @@ -1,9 +1,9 @@ +use crate::Channel; use crate::{cosmos_modules, error::DaemonError, Daemon}; use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; use cosmwasm_std::Addr; use cw_orch_core::environment::{Querier, QuerierGetter}; use tokio::runtime::Handle; -use tonic::transport::Channel; /// Querier for the Cosmos Gov module /// All the async function are prefixed with `_` diff --git a/cw-orch-daemon/src/queriers/ibc.rs b/cw-orch-daemon/src/queriers/ibc.rs index 0c96f00b..3b2aaded 100644 --- a/cw-orch-daemon/src/queriers/ibc.rs +++ b/cw-orch-daemon/src/queriers/ibc.rs @@ -1,3 +1,4 @@ +use crate::Channel; use crate::{cosmos_modules, error::DaemonError, Daemon}; use cosmos_modules::ibc_channel; use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; @@ -13,8 +14,6 @@ use cosmrs::proto::ibc::{ use cw_orch_core::environment::{Querier, QuerierGetter}; use prost::Message; use tokio::runtime::Handle; -use tonic::transport::Channel; - /// Querier for the Cosmos IBC module /// All the async function are prefixed with `_` pub struct Ibc { diff --git a/cw-orch-daemon/src/queriers/node.rs b/cw-orch-daemon/src/queriers/node.rs index 0958da8a..68b6c3f9 100644 --- a/cw-orch-daemon/src/queriers/node.rs +++ b/cw-orch-daemon/src/queriers/node.rs @@ -5,6 +5,7 @@ use crate::{ tx_resp::CosmTxResponse, DaemonBase, }; +use crate::Channel; use cosmrs::{ proto::cosmos::{ base::query::v1beta1::PageRequest, @@ -18,8 +19,6 @@ use cw_orch_core::{ log::query_target, }; use tokio::runtime::Handle; -use tonic::transport::Channel; - /// Querier for the Tendermint node. /// Supports queries for block and tx information /// All the async function are prefixed with `_` diff --git a/cw-orch-daemon/src/queriers/staking.rs b/cw-orch-daemon/src/queriers/staking.rs index 98936ea4..19d92d0f 100644 --- a/cw-orch-daemon/src/queriers/staking.rs +++ b/cw-orch-daemon/src/queriers/staking.rs @@ -1,11 +1,11 @@ use std::fmt::Display; +use crate::Channel; use crate::{cosmos_modules, error::DaemonError, Daemon}; use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; use cosmwasm_std::{Addr, StdError}; use cw_orch_core::environment::{Querier, QuerierGetter}; use tokio::runtime::Handle; -use tonic::transport::Channel; use super::bank::cosmrs_to_cosmwasm_coin; diff --git a/cw-orch-daemon/src/senders/cosmos.rs b/cw-orch-daemon/src/senders/cosmos.rs index 1fba1340..2126e5ea 100644 --- a/cw-orch-daemon/src/senders/cosmos.rs +++ b/cw-orch-daemon/src/senders/cosmos.rs @@ -4,6 +4,7 @@ use super::{ sign::{Signer, SigningAccount}, tx::TxSender, }; +use crate::Channel; use crate::{ core::parse_cw_coins, cosmos_modules::{self, auth::BaseAccount}, @@ -33,7 +34,6 @@ use cw_orch_core::{ CoreEnvVars, CwEnvError, }; use std::{str::FromStr, sync::Arc}; -use tonic::transport::Channel; #[cfg(feature = "eth")] use crate::proto::injective::InjectiveSigner; diff --git a/cw-orch-daemon/src/senders/cosmos_batch.rs b/cw-orch-daemon/src/senders/cosmos_batch.rs index d939ff20..4001e165 100644 --- a/cw-orch-daemon/src/senders/cosmos_batch.rs +++ b/cw-orch-daemon/src/senders/cosmos_batch.rs @@ -1,4 +1,4 @@ -use crate::{DaemonBase, INSTANTIATE_2_TYPE_URL}; +use crate::{Channel, DaemonBase, INSTANTIATE_2_TYPE_URL}; use crate::{error::DaemonError, tx_resp::CosmTxResponse}; @@ -86,7 +86,7 @@ impl QuerySender for CosmosBatchSender { type Error = DaemonError; type Options = CosmosBatchOptions; - fn channel(&self) -> tonic::transport::Channel { + fn channel(&self) -> Channel { self.sender.channel() } } diff --git a/cw-orch-daemon/src/senders/query.rs b/cw-orch-daemon/src/senders/query.rs index 208c3f0a..2ccca67f 100644 --- a/cw-orch-daemon/src/senders/query.rs +++ b/cw-orch-daemon/src/senders/query.rs @@ -1,4 +1,4 @@ -use tonic::transport::Channel; +use crate::Channel; use crate::DaemonError; diff --git a/cw-orch-daemon/src/senders/query_only.rs b/cw-orch-daemon/src/senders/query_only.rs index 7238e1e1..0bc74fad 100644 --- a/cw-orch-daemon/src/senders/query_only.rs +++ b/cw-orch-daemon/src/senders/query_only.rs @@ -4,7 +4,7 @@ use crate::{error::DaemonError, DaemonBase, GrpcChannel}; use cw_orch_core::environment::ChainInfoOwned; -use tonic::transport::Channel; +use crate::Channel; use super::{builder::SenderBuilder, query::QuerySender}; diff --git a/cw-orch-daemon/src/service/attempts.rs b/cw-orch-daemon/src/service/attempts.rs new file mode 100644 index 00000000..b5107a79 --- /dev/null +++ b/cw-orch-daemon/src/service/attempts.rs @@ -0,0 +1,39 @@ +#[derive(Clone)] +pub enum Attempts { + Unlimited, + Count(usize), +} + +impl std::fmt::Display for Attempts { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Attempts::Unlimited => write!(f, "unlimited")?, + Attempts::Count(count) => write!(f, "{}", count)?, + } + Ok(()) + } +} + +impl Attempts { + pub fn can_retry(&self) -> bool { + match self { + Attempts::Unlimited => true, + Attempts::Count(count) => *count > 0, + } + } + + /// Verifies the attempt can retry + /// If it can retry, decrements the counter + pub fn retry(&mut self) -> bool { + let can_retry = self.can_retry(); + if can_retry { + self.decrement(); + } + can_retry + } + fn decrement(&mut self) { + if let Attempts::Count(count) = self { + *count -= 1 + } + } +} diff --git a/cw-orch-daemon/src/service/mod.rs b/cw-orch-daemon/src/service/mod.rs new file mode 100644 index 00000000..3effe0a2 --- /dev/null +++ b/cw-orch-daemon/src/service/mod.rs @@ -0,0 +1,3 @@ +pub mod attempts; +pub mod reconnect; +pub mod retry; diff --git a/cw-orch-daemon/src/service/reconnect/factory.rs b/cw-orch-daemon/src/service/reconnect/factory.rs new file mode 100644 index 00000000..dcac9097 --- /dev/null +++ b/cw-orch-daemon/src/service/reconnect/factory.rs @@ -0,0 +1,28 @@ +use std::{future::Future, pin::Pin}; + +use tower::Service; + +use crate::{DaemonError, GrpcChannel, TowerChannel}; + +pub type ChannelCreationArgs = (Vec, String); +#[derive(Clone)] +pub struct ChannelFactory {} + +impl Service for ChannelFactory { + type Response = TowerChannel; + + type Error = DaemonError; + + type Future = Pin> + Send>>; + + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, req: ChannelCreationArgs) -> Self::Future { + Box::pin(async move { GrpcChannel::get_channel(req.0.as_ref(), &req.1).await }) + } +} diff --git a/cw-orch-daemon/src/service/reconnect/future.rs b/cw-orch-daemon/src/service/reconnect/future.rs new file mode 100644 index 00000000..b0f2a7d0 --- /dev/null +++ b/cw-orch-daemon/src/service/reconnect/future.rs @@ -0,0 +1,74 @@ +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tower::BoxError; + +pin_project! { + /// Future that resolves to the response or failure to connect. + #[derive(Debug)] + pub struct ResponseFuture { + #[pin] + inner: Inner, + } +} + +pin_project! { + #[project = InnerProj] + #[derive(Debug)] + enum Inner { + Future { + #[pin] + fut: F, + }, + Error { + error: Option, + }, + } +} + +impl Inner { + fn future(fut: F) -> Self { + Self::Future { fut } + } + + fn error(error: Option) -> Self { + Self::Error { error } + } +} + +impl ResponseFuture { + pub(crate) fn new(inner: F) -> Self { + ResponseFuture { + inner: Inner::future(inner), + } + } + + pub(crate) fn error(error: E) -> Self { + ResponseFuture { + inner: Inner::error(Some(error)), + } + } +} + +impl Future for ResponseFuture +where + F: Future>, + E: Into, + ME: Into, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + match me.inner.project() { + InnerProj::Future { fut } => fut.poll(cx).map_err(Into::into), + InnerProj::Error { error } => { + let e = error.take().expect("Polled after ready.").into(); + Poll::Ready(Err(e)) + } + } + } +} diff --git a/cw-orch-daemon/src/service/reconnect/mod.rs b/cw-orch-daemon/src/service/reconnect/mod.rs new file mode 100644 index 00000000..e497d0c6 --- /dev/null +++ b/cw-orch-daemon/src/service/reconnect/mod.rs @@ -0,0 +1,225 @@ +pub mod factory; +pub mod future; +use future::ResponseFuture; +use log::{debug, trace}; +use std::sync::{Arc, Mutex}; +use std::thread::sleep; +use std::time::Duration; +use std::{fmt, mem}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tower::{BoxError, Service}; + +pub use factory::ChannelCreationArgs; +pub use factory::ChannelFactory; + +use super::attempts::Attempts; + +/// Reconnect to failed services. +pub struct Reconnect +where + M: Service, + M::Error: std::fmt::Debug, + + M: Sync, + Target: Sync, +{ + mk_service: M, + #[allow(clippy::type_complexity)] + state: Arc>>, + target: Target, + attempts: Attempts, +} + +impl Clone for Reconnect +where + M: Service, + M::Error: std::fmt::Debug, + M: Clone, + Target: Clone, + M: Sync, + Target: Sync, +{ + fn clone(&self) -> Self { + Self { + mk_service: self.mk_service.clone(), + state: self.state.clone(), + target: self.target.clone(), + attempts: self.attempts.clone(), + } + } +} + +#[derive(Debug)] +enum State { + Error(E), + Idle, + Connecting(F), + Connected(S), +} + +impl State { + pub(crate) fn unwrap_err(self) -> E { + match self { + State::Error(e) => e, + _ => panic!("Not error"), + } + } +} + +impl Reconnect +where + M: Service, + M::Error: std::fmt::Debug, + M: Sync, + Target: Sync, +{ + /// Lazily connect and reconnect to a [`Service`]. + pub fn new(mk_service: M, target: Target) -> Self { + Reconnect { + mk_service, + state: Arc::new(Mutex::new(State::Idle)), + target, + attempts: Attempts::Unlimited, + } + } + + /// Reconnect to a already connected [`Service`]. + pub fn with_connection(init_conn: M::Response, mk_service: M, target: Target) -> Self { + Reconnect { + mk_service, + state: Arc::new(Mutex::new(State::Connected(init_conn))), + target, + attempts: Attempts::Unlimited, + } + } + + pub fn with_attemps(mut self, attempts: usize) -> Self { + self.attempts = Attempts::Count(attempts); + self + } +} + +impl Service for Reconnect +where + M: Service + Sync, + M::Error: std::fmt::Debug, + M::Future: Unpin, + BoxError: From + From, + + S: Service, + Target: Clone + Sync, +{ + type Response = S::Response; + type Error = BoxError; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + let mut state = self.state.lock().unwrap(); + match &mut *state { + State::Idle | State::Error(_) => { + trace!("poll_ready; idle"); + match self.mk_service.poll_ready(cx) { + Poll::Ready(r) => r?, + Poll::Pending => { + trace!("poll_ready; MakeService not ready"); + return Poll::Pending; + } + } + let fut = self.mk_service.call(self.target.clone()); + drop(state); + self.state = Arc::new(Mutex::new(State::Connecting(fut))); + continue; + } + State::Connecting(ref mut f) => { + trace!("poll_ready; connecting"); + match Pin::new(f).poll(cx) { + Poll::Ready(Ok(service)) => { + drop(state); + self.state = Arc::new(Mutex::new(State::Connected(service))); + } + Poll::Pending => { + trace!("poll_ready; not ready"); + return Poll::Pending; + } + Poll::Ready(Err(e)) => { + drop(state); + if self.attempts.retry() { + trace!("poll_ready; error"); + debug!( + "Connection error, retrying in {} seconds, {} attemps left", + 5, self.attempts + ); + self.state = Arc::new(Mutex::new(State::Error(e))); + sleep(Duration::from_secs(5)); + } else { + self.state = Arc::new(Mutex::new(State::Error(e))); + + break; + } + } + } + } + State::Connected(ref mut inner) => { + trace!("poll_ready; connected"); + match inner.poll_ready(cx) { + Poll::Ready(Ok(())) => { + trace!("poll_ready; ready"); + return Poll::Ready(Ok(())); + } + Poll::Pending => { + trace!("poll_ready; not ready"); + return Poll::Pending; + } + Poll::Ready(Err(_)) => { + trace!("poll_ready; error"); + + drop(state); + self.state = Arc::new(Mutex::new(State::Idle)); + } + } + } + } + } + + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: Request) -> Self::Future { + let mut state = self.state.lock().unwrap(); + let service = match &mut *state { + State::Connected(ref mut service) => service, + State::Error(_) => { + let state = mem::replace(&mut *state, State::Idle); + return ResponseFuture::error(state.unwrap_err()); + } + _ => panic!("service not ready; poll_ready must be called first"), + }; + + let fut = service.call(request); + ResponseFuture::new(fut) + } +} + +impl fmt::Debug for Reconnect +where + M: Service + fmt::Debug, + M::Future: fmt::Debug, + M::Response: fmt::Debug, + Target: fmt::Debug, + M::Error: std::fmt::Debug, + M: Sync, + Target: Sync, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Reconnect") + .field("mk_service", &self.mk_service) + .field("state", &self.state) + .field("target", &self.target) + .finish() + } +} diff --git a/cw-orch-daemon/src/service/retry/future.rs b/cw-orch-daemon/src/service/retry/future.rs new file mode 100644 index 00000000..159251ab --- /dev/null +++ b/cw-orch-daemon/src/service/retry/future.rs @@ -0,0 +1,121 @@ +//! Future types + +use super::policy::Policy; +use super::Retry; +use futures_core::ready; +use pin_project_lite::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tower::Service; + +pin_project! { + /// The [`Future`] returned by a [`Retry`] service. + #[derive(Debug)] + pub struct ResponseFuture + where + P: Policy, + S: Service, + { + request: Option, + #[pin] + retry: Retry, + #[pin] + state: State, + } +} + +pin_project! { + #[project = StateProj] + #[derive(Debug)] + enum State { + // Polling the future from [`Service::call`] + Called { + #[pin] + future: F + }, + // Polling the future from [`Policy::retry`] + Waiting { + #[pin] + waiting: P + }, + // Polling [`Service::poll_ready`] after [`Waiting`] was OK. + Retrying, + } +} + +impl ResponseFuture +where + P: Policy, + S: Service, +{ + pub(crate) fn new( + request: Option, + retry: Retry, + future: S::Future, + ) -> ResponseFuture { + ResponseFuture { + request, + retry, + state: State::Called { future }, + } + } +} + +impl Future for ResponseFuture +where + P: Policy, + S: Service, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + match this.state.as_mut().project() { + StateProj::Called { future } => { + let mut result = ready!(future.poll(cx)); + if let Some(req) = &mut this.request { + match this.retry.policy.retry(req, &mut result) { + Some(waiting) => { + this.state.set(State::Waiting { waiting }); + } + None => return Poll::Ready(result), + } + } else { + // request wasn't cloned, so no way to retry it + return Poll::Ready(result); + } + } + StateProj::Waiting { waiting } => { + ready!(waiting.poll(cx)); + + this.state.set(State::Retrying); + } + StateProj::Retrying => { + // NOTE: we assume here that + // + // this.retry.poll_ready() + // + // is equivalent to + // + // this.retry.service.poll_ready() + // + // we need to make that assumption to avoid adding an Unpin bound to the Policy + // in Ready to make it Unpin so that we can get &mut Ready as needed to call + // poll_ready on it. + ready!(this.retry.as_mut().project().service.poll_ready(cx))?; + let mut req = this + .request + .take() + .expect("retrying requires cloned request"); + (req, *this.request) = this.retry.policy.clone_request(req); + this.state.set(State::Called { + future: this.retry.as_mut().project().service.call(req), + }); + } + } + } + } +} diff --git a/cw-orch-daemon/src/service/retry/implementation.rs b/cw-orch-daemon/src/service/retry/implementation.rs new file mode 100644 index 00000000..4ac0c14e --- /dev/null +++ b/cw-orch-daemon/src/service/retry/implementation.rs @@ -0,0 +1,110 @@ +use crate::service::attempts::Attempts; + +use super::Policy; +use futures::TryStreamExt; +use futures_util::future; +use http::{request::Parts, Request}; +use http_body_util::{BodyExt, Full}; +use hyper::body::Bytes; +use tonic::body::BoxBody; + +type Req = http::Request; +type Res = http::Response; + +#[derive(Clone)] +pub struct RetryAttemps(pub Attempts); + +impl RetryAttemps { + pub fn unlimited() -> Self { + Self(Attempts::Unlimited) + } + pub fn count(count: usize) -> Self { + Self(Attempts::Count(count)) + } +} + +impl From for RetryAttemps { + fn from(value: Attempts) -> Self { + Self(value) + } +} + +impl Policy for RetryAttemps { + type Future = future::Ready<()>; + + fn retry(&mut self, _req: &mut Req, result: &mut Result) -> Option { + match result { + Ok(_) => { + log::trace!("Entering the middleware ok"); + // Treat all `Response`s as success, + // so don't retry... + None + } + Err(_) => { + log::trace!("Entering the middleware error"); + // Treat all errors as failures... + // But we limit the number of attempts... + if self.0.retry() { + log::trace!("Try this again, there was a failure"); + // Try again! + Some(future::ready(())) + } else { + // Used all our attempts, no retry... + None + } + } + } + } + + fn clone_request(&mut self, req: Req) -> (Req, Option) { + // Convert body to Bytes so it can be cloned + let (parts, original_body) = req.into_parts(); + + // Try to capture the Bytes from the original body + // This is circumvoluted, I'm not sure how to call an async function within a sync function that is used inside a future later + let bytes = futures::executor::block_on(async move { + tokio::runtime::Handle::current() + .spawn(async move { consume_unsync_body(original_body).await }) + .await + .unwrap() + }); + + // Re-create the request with the captured bytes in a new BoxBody + let req = create_request(parts.clone(), bytes.clone()); + let cloned_req = create_request(parts, bytes); + + (req, Some(cloned_req)) + // Some(req.clone()) + } +} + +async fn consume_unsync_body(body: BoxBody) -> Vec { + // Accumulate bytes asynchronously + + body.into_data_stream() + .try_fold(Vec::new(), |mut acc, chunk| async move { + acc.extend_from_slice(&chunk); + Ok(acc) + }) + .await + .unwrap() +} + +fn create_request(parts: Parts, body: Vec) -> http::Request { + let bytes = Bytes::from(body); + let full_body = Full::new(bytes); + let mut request = Request::builder() + .method(parts.method) + .uri(parts.uri) + .version(parts.version) + .body( + full_body + .map_err(|_err| tonic::Status::internal("Body error")) + .boxed_unsync(), + ) + .unwrap(); + + *request.headers_mut() = parts.headers; + + request +} diff --git a/cw-orch-daemon/src/service/retry/layer.rs b/cw-orch-daemon/src/service/retry/layer.rs new file mode 100644 index 00000000..65a15e02 --- /dev/null +++ b/cw-orch-daemon/src/service/retry/layer.rs @@ -0,0 +1,27 @@ +use super::Retry; +use tower::Layer; + +/// Retry requests based on a policy +#[derive(Debug, Clone)] +pub struct RetryLayer

{ + policy: P, +} + +impl

RetryLayer

{ + /// Creates a new [`RetryLayer`] from a retry policy. + pub const fn new(policy: P) -> Self { + RetryLayer { policy } + } +} + +impl Layer for RetryLayer

+where + P: Clone, +{ + type Service = Retry; + + fn layer(&self, service: S) -> Self::Service { + let policy = self.policy.clone(); + Retry::new(policy, service) + } +} diff --git a/cw-orch-daemon/src/service/retry/mod.rs b/cw-orch-daemon/src/service/retry/mod.rs new file mode 100644 index 00000000..af466be4 --- /dev/null +++ b/cw-orch-daemon/src/service/retry/mod.rs @@ -0,0 +1,91 @@ +pub mod future; +pub mod implementation; +pub mod layer; +pub mod policy; +pub use implementation::RetryAttemps; +pub use layer::RetryLayer; +pub use policy::Policy; + +use crate::service::retry::future::ResponseFuture; +use pin_project_lite::pin_project; +use std::task::{Context, Poll}; +use tower::Service; + +pin_project! { + /// Configure retrying requests of "failed" responses. + /// + /// A [`Policy`] classifies what is a "failed" response. + /// + /// # Clone + /// + /// This middleware requires that the inner `Service` implements [`Clone`], + /// because the `Service` must be stored in each [`ResponseFuture`] in + /// order to retry the request in the event of a failure. If the inner + /// `Service` type does not implement `Clone`, the [`Buffer`] middleware + /// can be added to make any `Service` cloneable. + /// + /// [`Buffer`]: crate::buffer::Buffer + /// + /// The `Policy` must also implement `Clone`. This middleware will + /// clone the policy for each _request session_. This means a new clone + /// of the policy will be created for each initial request and any subsequent + /// retries of that request. Therefore, any state stored in the `Policy` instance + /// is for that request session only. In order to share data across request + /// sessions, that shared state may be stored in an [`Arc`], so that all clones + /// of the `Policy` type reference the same instance of the shared state. + /// + /// [`Arc`]: std::sync::Arc + #[derive(Clone, Debug)] + pub struct Retry { + policy: P, + service: S, + } +} + +// ===== impl Retry ===== + +impl Retry { + /// Retry the inner service depending on this [`Policy`]. + pub const fn new(policy: P, service: S) -> Self { + Retry { policy, service } + } + + /// Get a reference to the inner service + pub fn get_ref(&self) -> &S { + &self.service + } + + /// Get a mutable reference to the inner service + pub fn get_mut(&mut self) -> &mut S { + &mut self.service + } + + /// Consume `self`, returning the inner service + pub fn into_inner(self) -> S { + self.service + } +} + +impl Service for Retry +where + P: Policy + Clone, + S: Service + Clone, +{ + type Response = S::Response; + type Error = S::Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // NOTE: the Future::poll impl for ResponseFuture assumes that Retry::poll_ready is + // equivalent to Ready.service.poll_ready. If this ever changes, that code must be updated + // as well. + self.service.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> Self::Future { + let (request, cloned) = self.policy.clone_request(request); + let future = self.service.call(request); + + ResponseFuture::new(cloned, self.clone(), future) + } +} diff --git a/cw-orch-daemon/src/service/retry/policy.rs b/cw-orch-daemon/src/service/retry/policy.rs new file mode 100644 index 00000000..7f300794 --- /dev/null +++ b/cw-orch-daemon/src/service/retry/policy.rs @@ -0,0 +1,94 @@ +use std::future::Future; + +/// A "retry policy" to classify if a request should be retried. +/// +/// # Example +/// +/// ``` +/// use tower::retry::Policy; +/// use futures_util::future; +/// +/// type Req = String; +/// type Res = String; +/// +/// struct Attempts(usize); +/// +/// impl Policy for Attempts { +/// type Future = future::Ready<()>; +/// +/// fn retry(&mut self, req: &mut Req, result: &mut Result) -> Option { +/// match result { +/// Ok(_) => { +/// // Treat all `Response`s as success, +/// // so don't retry... +/// None +/// }, +/// Err(_) => { +/// // Treat all errors as failures... +/// // But we limit the number of attempts... +/// if self.0 > 0 { +/// // Try again! +/// self.0 -= 1; +/// Some(future::ready(())) +/// } else { +/// // Used all our attempts, no retry... +/// None +/// } +/// } +/// } +/// } +/// +/// fn clone_request(&mut self, req: &Req) -> Option { +/// Some(req.clone()) +/// } +/// } +/// ``` +pub trait Policy { + /// The [`Future`] type returned by [`Policy::retry`]. + type Future: Future; + + /// Check the policy if a certain request should be retried. + /// + /// This method is passed a reference to the original request, and either + /// the [`Service::Response`] or [`Service::Error`] from the inner service. + /// + /// If the request should **not** be retried, return `None`. + /// + /// If the request *should* be retried, return `Some` future that will delay + /// the next retry of the request. This can be used to sleep for a certain + /// duration, to wait for some external condition to be met before retrying, + /// or resolve right away, if the request should be retried immediately. + /// + /// ## Mutating Requests + /// + /// The policy MAY chose to mutate the `req`: if the request is mutated, the + /// mutated request will be sent to the inner service in the next retry. + /// This can be helpful for use cases like tracking the retry count in a + /// header. + /// + /// ## Mutating Results + /// + /// The policy MAY chose to mutate the result. This enables the retry + /// policy to convert a failure into a success and vice versa. For example, + /// if the policy is used to poll while waiting for a state change, the + /// policy can switch the result to emit a specific error when retries are + /// exhausted. + /// + /// The policy can also record metadata on the request to include + /// information about the number of retries required or to record that a + /// failure failed after exhausting all retries. + /// + /// [`Service::Response`]: crate::Service::Response + /// [`Service::Error`]: crate::Service::Error + fn retry(&mut self, req: &mut Req, result: &mut Result) -> Option; + + /// Tries to clone a request before being passed to the inner service. + /// + /// If the request cannot be cloned, return [`None`]. Moreover, the retry + /// function will not be called if the [`None`] is returned. + fn clone_request(&mut self, req: Req) -> (Req, Option); +} + +// Ensure `Policy` is object safe +#[cfg(test)] +fn _obj_safe(_: Box>>) {} diff --git a/cw-orch-daemon/src/sync/core.rs b/cw-orch-daemon/src/sync/core.rs index 7320cc17..39c73206 100644 --- a/cw-orch-daemon/src/sync/core.rs +++ b/cw-orch-daemon/src/sync/core.rs @@ -1,6 +1,7 @@ use std::{fmt::Debug, ops::DerefMut}; use super::super::senders::Wallet; +use crate::Channel; use crate::{ queriers::{Bank, CosmWasmBase, Node}, senders::{builder::SenderBuilder, query::QuerySender}, @@ -14,7 +15,6 @@ use cw_orch_core::{ use cw_orch_traits::stargate::Stargate; use serde::Serialize; use tokio::runtime::Handle; -use tonic::transport::Channel; use crate::senders::tx::TxSender; diff --git a/cw-orch-daemon/tests/querier.rs b/cw-orch-daemon/tests/querier.rs index 9bc334d1..78366918 100644 --- a/cw-orch-daemon/tests/querier.rs +++ b/cw-orch-daemon/tests/querier.rs @@ -23,18 +23,15 @@ mod queriers { tx::{self, Msg}, AccountId, Denom, }; + use cw_orch_daemon::Channel; - pub async fn build_channel() -> tonic::transport::Channel { + pub async fn build_channel() -> Channel { let network = networks::LOCAL_JUNO; let grpcs = vec![network.grpc_urls[0].into()]; let channel = GrpcChannel::connect(&grpcs, network.chain_id).await; - asserting!("channel connection is succesful") - .that(&channel) - .is_ok(); - channel.unwrap() } diff --git a/packages/clone-testing/src/core.rs b/packages/clone-testing/src/core.rs index 9ff00b39..233bc113 100644 --- a/packages/clone-testing/src/core.rs +++ b/packages/clone-testing/src/core.rs @@ -17,7 +17,9 @@ use cw_orch_core::{ }, CwEnvError, }; -use cw_orch_daemon::{queriers::Node, read_network_config, DEFAULT_DEPLOYMENT, RUNTIME}; +use cw_orch_daemon::{ + queriers::Node, read_network_config, GrpcChannel, DEFAULT_DEPLOYMENT, RUNTIME, +}; use cw_utils::NativeBalance; use serde::Serialize; use tokio::runtime::Runtime; @@ -235,10 +237,11 @@ impl CloneTesting { let bank = BankKeeper::new().with_remote(remote_channel.clone()); - // We update the block_height + // We update the block_height, and open a second channel just for that (to make sure we are not too dependent on clone-cw-multi-test deps) + let node_channel = rt.block_on(GrpcChannel::from_chain_info(&chain))?; let block_info = remote_channel .rt - .block_on(Node::new_async(remote_channel.channel.clone())._block_info()) + .block_on(Node::new_async(node_channel)._block_info()) .unwrap(); // Finally we instantiate a new app diff --git a/packages/interchain/interchain-core/src/channel.rs b/packages/interchain/interchain-core/src/channel.rs index 98e4320d..365299d8 100644 --- a/packages/interchain/interchain-core/src/channel.rs +++ b/packages/interchain/interchain-core/src/channel.rs @@ -12,7 +12,7 @@ use crate::InterchainError; /// Identifies a channel between two IBC connected chains. /// This describes only 1 side of the channel -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct IbcPort { /// The chain id of the network which belongs on one side of the channel pub chain_id: NetworkId, @@ -31,10 +31,21 @@ pub struct IbcPort { pub chain: Channel, } +impl std::fmt::Debug for IbcPort { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IbcPort") + .field("chain_id", &self.chain_id) + .field("connection_id", &self.connection_id) + .field("port", &self.port) + .field("channel", &self.channel) + .finish() + } +} + /// Store information about a channel between 2 blockchains /// The order of port_a and port_b is not important /// Even if there is a src and dst chain, the order for an IBC channel doesn't matter -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct InterchainChannel where Channel: Clone + Send + Sync, @@ -45,6 +56,18 @@ where pub port_b: IbcPort, } +impl std::fmt::Debug for InterchainChannel +where + Channel: Clone + Send + Sync, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InterchainChannel") + .field("port_a", &self.port_a) + .field("port_b", &self.port_b) + .finish() + } +} + // TODO some of those queries may be implemented (or are already implemented) in the IBC querier file ? impl InterchainChannel where diff --git a/packages/interchain/interchain-core/src/ibc_query.rs b/packages/interchain/interchain-core/src/ibc_query.rs index b22e6c93..2c31b4b2 100644 --- a/packages/interchain/interchain-core/src/ibc_query.rs +++ b/packages/interchain/interchain-core/src/ibc_query.rs @@ -2,6 +2,7 @@ use crate::results::NetworkId; use cosmwasm_std::Api; use cw_orch_core::environment::CwEnv; use cw_orch_core::environment::QueryHandler; +use cw_orch_daemon::Channel; use cw_orch_mock::{MockBase, MockState}; /// Adds additional capabilities to CwEnv for use with ibc environments @@ -20,9 +21,9 @@ pub trait IbcQueryHandler: CwEnv { #[cfg(feature = "daemon")] // Temporary until we can actually push to cw-orch-daemon impl IbcQueryHandler for cw_orch_daemon::Daemon { - type Handler = tonic::transport::Channel; + type Handler = Channel; - fn ibc_handler(&self) -> tonic::transport::Channel { + fn ibc_handler(&self) -> Channel { self.channel() } diff --git a/packages/interchain/interchain-daemon/src/error.rs b/packages/interchain/interchain-daemon/src/error.rs index f31fefc7..314e3f72 100644 --- a/packages/interchain/interchain-daemon/src/error.rs +++ b/packages/interchain/interchain-daemon/src/error.rs @@ -1,9 +1,8 @@ #![allow(missing_docs)] use cosmwasm_std::StdError; -use cw_orch_interchain_core::{channel::InterchainChannel, results::NetworkId, InterchainError}; +use cw_orch_interchain_core::{results::NetworkId, InterchainError}; use thiserror::Error; -use tonic::transport::Channel; #[derive(Error, Debug)] pub enum InterchainDaemonError { @@ -40,7 +39,8 @@ pub enum InterchainDaemonError { #[error("Channel creation events not found from chain {src_chain} on following channel : {channel:?}")] ChannelCreationEventsNotFound { src_chain: NetworkId, - channel: InterchainChannel, + /// Meant to contain InterchainChannel, but this is a large variant so used a string to remove warning + channel: String, }, #[error("Configuration already registered for chain {0}")] diff --git a/packages/interchain/interchain-daemon/src/ibc_tracker.rs b/packages/interchain/interchain-daemon/src/ibc_tracker.rs index 52ccb6a3..f775cb15 100644 --- a/packages/interchain/interchain-daemon/src/ibc_tracker.rs +++ b/packages/interchain/interchain-daemon/src/ibc_tracker.rs @@ -5,7 +5,7 @@ use cosmrs::proto::ibc::core::channel::v1::State; use cw_orch_core::contract::interface_traits::ContractInstance; use cw_orch_core::environment::Environment; use cw_orch_daemon::queriers::{Ibc, Node}; -use cw_orch_daemon::Daemon; +use cw_orch_daemon::{Channel, Daemon}; use cw_orch_interchain_core::env::contract_port; use diff::Diff; use futures_util::future::join_all; @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::collections::HashSet; use std::error::Error; use std::{fmt::Display, time::Duration}; -use tonic::{async_trait, transport::Channel}; +use tonic::async_trait; use self::logged_state::LoggedState; @@ -117,8 +117,9 @@ mod logged_state { fmt::{Debug, Display}, }; + use cw_orch_daemon::Channel; use diff::Diff; - use tonic::{async_trait, transport::Channel}; + use tonic::async_trait; #[async_trait] pub trait LoggedState: diff --git a/packages/interchain/interchain-daemon/src/interchain_env.rs b/packages/interchain/interchain-daemon/src/interchain_env.rs index f7ed82fc..e1d31c7d 100644 --- a/packages/interchain/interchain-daemon/src/interchain_env.rs +++ b/packages/interchain/interchain-daemon/src/interchain_env.rs @@ -6,9 +6,9 @@ use cw_orch_interchain_core::channel::{IbcPort, InterchainChannel}; use cw_orch_interchain_core::env::{ChainId, ChannelCreation}; use cw_orch_interchain_core::{InterchainEnv, NestedPacketsFlow, SinglePacketFlow}; +use cw_orch_daemon::Channel; use ibc_relayer_types::core::ics04_channel::packet::Sequence; use tokio::time::sleep; -use tonic::transport::Channel; use crate::channel_creator::{ChannelCreationValidator, ChannelCreator}; use crate::interchain_log::InterchainLog; @@ -367,7 +367,7 @@ impl DaemonInterchain { Err(InterchainDaemonError::ChannelCreationEventsNotFound { src_chain: src_chain.to_string(), - channel: ibc_channel.clone(), + channel: format!("{ibc_channel:?}"), }) } diff --git a/packages/interchain/interchain-daemon/src/packet_inspector.rs b/packages/interchain/interchain-daemon/src/packet_inspector.rs index c72d4f03..0f0260bf 100644 --- a/packages/interchain/interchain-daemon/src/packet_inspector.rs +++ b/packages/interchain/interchain-daemon/src/packet_inspector.rs @@ -18,10 +18,10 @@ use futures_util::FutureExt; use crate::{IcDaemonResult, InterchainDaemonError}; use cw_orch_interchain_core::results::NetworkId; +use cw_orch_daemon::Channel; use futures::future::try_join_all; use ibc_relayer_types::core::ics04_channel::packet::Sequence; use ibc_relayer_types::core::ics24_host::identifier::{ChannelId, PortId}; -use tonic::transport::Channel; use std::collections::HashMap; diff --git a/packages/interchain/proto/Cargo.toml b/packages/interchain/proto/Cargo.toml index 28324984..c1089ca9 100644 --- a/packages/interchain/proto/Cargo.toml +++ b/packages/interchain/proto/Cargo.toml @@ -13,6 +13,7 @@ repository.workspace = true cw-orch-interchain-core = { workspace = true } cw-orch-traits = { workspace = true } cw-orch-core = { workspace = true } +cw-orch-daemon = { workspace = true } anyhow = { workspace = true } diff --git a/packages/interchain/proto/src/tokenfactory.rs b/packages/interchain/proto/src/tokenfactory.rs index eeaefd93..066d1f5b 100644 --- a/packages/interchain/proto/src/tokenfactory.rs +++ b/packages/interchain/proto/src/tokenfactory.rs @@ -1,12 +1,12 @@ #![allow(non_snake_case)] +use cw_orch_daemon::Channel; use cw_orch_interchain_core::{ channel::InterchainChannel, IbcQueryHandler, InterchainEnv, InterchainError, NestedPacketsFlow, }; use ibc_proto::ibc::apps::transfer::v1::MsgTransfer; use osmosis_std::types::osmosis::tokenfactory::v1beta1::{MsgCreateDenom, MsgMint}; use prost::{Message, Name}; -use tonic::transport::Channel; use cosmwasm_std::Coin; use cw_orch_core::environment::{CwEnv, TxHandler};