diff --git a/Cargo.toml b/Cargo.toml index 7a0c4fa..daadb65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,20 +18,20 @@ serde_json = "1" # Optional dependencies bytes = { version = "1", optional = true } -reqwest = { version = "0.12.23", optional = true } +bitreq = { version = "0.2.0", features = ["async-https"], optional = true } tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"], optional = true } [dev-dependencies] anyhow = "1" futures = { version = "0.3"} log = "0.4" -mempool_space_api = { path = ".", features = ["reqwest"] } +mempool_space_api = { path = ".", features = ["bitreq"] } miniscript = { version = "12" } pretty_env_logger = "0.5.0" [features] default = [] -reqwest = ["dep:reqwest", "tokio", "bytes"] +bitreq = ["dep:bitreq", "tokio", "bytes"] [[example]] name = "client" diff --git a/examples/client.rs b/examples/client.rs index edc2080..79182c6 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -7,8 +7,8 @@ const URL: &str = "https://mempool.space/api"; #[tokio::main] async fn main() -> anyhow::Result<()> { - let reqwest_client = mempool_space_api::ReqwestClient::default(); - let client = AsyncClient::new(URL, &reqwest_client); + let bitreq_client = mempool_space_api::BitreqClient::default(); + let client = AsyncClient::new(URL, &bitreq_client); // GET /blocks/tip/height. let res = client.get_tip_height().await?; diff --git a/examples/sync.rs b/examples/sync.rs index 2a33bdb..b64db0f 100644 --- a/examples/sync.rs +++ b/examples/sync.rs @@ -6,13 +6,14 @@ use std::sync::Arc; /// Server url. const URL: &str = "https://mempool.space/signet/api"; +const STOP_GAP: u32 = 10; #[tokio::main] async fn main() -> anyhow::Result<()> { pretty_env_logger::init_timed(); - let reqwest_client = mempool_space_api::ReqwestClient::default(); - let client = AsyncClient::new(URL, &reqwest_client); + let bitreq_client = mempool_space_api::BitreqClient::default(); + let client = AsyncClient::new(URL, &bitreq_client); let desc_str = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/1'/0'/0/*)"; let secp = secp256k1::Secp256k1::new(); @@ -51,6 +52,10 @@ async fn main() -> anyhow::Result<()> { }) .collect::>(); + if futures.is_empty() { + break; + } + for (index, script, txs) in futures.try_collect::>().await? { if txs.is_empty() { unused_ct += 1; @@ -65,7 +70,7 @@ async fn main() -> anyhow::Result<()> { } // Gap limit reached - if unused_ct > 20 { + if unused_ct > STOP_GAP { log::info!("Last active index {:?}", last_active); break; } diff --git a/src/bitreq_client.rs b/src/bitreq_client.rs new file mode 100644 index 0000000..8168735 --- /dev/null +++ b/src/bitreq_client.rs @@ -0,0 +1,134 @@ +use core::fmt; + +use bitreq::{Request, Response}; +use bytes::Bytes; + +use crate::{Http, HttpMethod}; + +pub extern crate bitreq; +pub extern crate tokio; + +/// Base backoff in milliseconds. +const BASE_BACKOFF_MILLIS: u64 = 256; +/// Default max retries. +const DEFAULT_MAX_RETRIES: u32 = 10; + +/// HTTP client implementation. +#[derive(Debug)] +pub struct BitreqClient { + /// The maximum number of times to retry a failed request. + /// In the future this may be a configurable option. + max_retries: u32, +} + +impl BitreqClient { + /// New. + pub fn new() -> Self { + Self::default() + } +} + +impl Default for BitreqClient { + fn default() -> Self { + Self { + max_retries: DEFAULT_MAX_RETRIES, + } + } +} + +impl Http for BitreqClient { + type Body = Bytes; + + type Err = BitreqError; + + async fn send<'a>( + &'a self, + method: HttpMethod, + url: &'a str, + body: impl Into, + ) -> Result + where + Self: 'a, + { + let resp = self.send_retry(method.into(), url, body.into()).await?; + + if !is_status_ok(resp.status_code) { + return Err(BitreqError::HttpResponse { + status: resp.status_code, + message: resp.reason_phrase, + }); + } + + Ok(resp.into_bytes().into()) + } +} + +impl BitreqClient { + /// Sends a request and allows for retrying failed attempts. See [`is_status_retryable`]. + async fn send_retry( + &self, + method: bitreq::Method, + url: &str, + body: Bytes, + ) -> Result { + let mut delay = BASE_BACKOFF_MILLIS; + let mut attempts = 0; + + loop { + match Request::new(method.clone(), url) + .with_body(body.clone()) + .send_async() + .await? + { + resp if attempts < self.max_retries && is_status_retryable(resp.status_code) => { + tokio::time::sleep(std::time::Duration::from_millis(delay)).await; + delay *= 2; + attempts += 1; + } + resp => return Ok(resp), + } + } + } +} + +/// Whether the response status indicates a failure which can be retried. +/// +/// Currently includes: +/// +/// - `429`: TOO_MANY_REQUESTS +/// - `500`: INTERNAL_SERVER_ERROR +/// - `503`: SERVICE_UNAVAILABLE +fn is_status_retryable(status: i32) -> bool { + [429, 500, 503].contains(&status) +} + +// Whether the response status code is `200 OK`. +fn is_status_ok(status: i32) -> bool { + status == 200 +} + +/// Error for `BitreqClient` +#[derive(Debug)] +pub enum BitreqError { + /// `bitreq` error. + Bitreq(bitreq::Error), + /// Reponse error. + HttpResponse { status: i32, message: String }, +} + +impl fmt::Display for BitreqError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Bitreq(e) => write!(f, "{e}"), + Self::HttpResponse { status, message } => write!(f, "{status} {message}"), + } + } +} + +impl std::error::Error for BitreqError {} + +impl From for BitreqError { + fn from(e: bitreq::Error) -> Self { + Self::Bitreq(e) + } +} diff --git a/src/client.rs b/src/client.rs index 8129c92..a25b450 100644 --- a/src/client.rs +++ b/src/client.rs @@ -290,8 +290,8 @@ mod test { #[tokio::test] async fn test_get_tip() -> anyhow::Result<()> { - let reqwest_client = crate::ReqwestClient::new(); - let client = AsyncClient::new(URL, reqwest_client); + let bitreq_client = crate::BitreqClient::new(); + let client = AsyncClient::new(URL, bitreq_client); let _ = client.get_tip_height().await?; let _ = client.get_tip_hash().await?; diff --git a/src/http.rs b/src/http.rs index 2d850fc..43f1ac6 100644 --- a/src/http.rs +++ b/src/http.rs @@ -19,6 +19,16 @@ impl HttpMethod { pub const POST: Self = Self(Method::Post); } +#[cfg(feature = "bitreq")] +impl From for bitreq::Method { + fn from(method: HttpMethod) -> Self { + match method.0 { + Method::Get => bitreq::Method::Get, + Method::Post => bitreq::Method::Post, + } + } +} + /// Trait describing the behavior required of the HTTP client. pub trait Http { /// Body diff --git a/src/lib.rs b/src/lib.rs index f83b973..6801747 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,14 +1,14 @@ //! `mempool_space_api` pub mod api; +#[cfg(feature = "bitreq")] +mod bitreq_client; mod client; mod error; mod http; -#[cfg(feature = "reqwest")] -mod reqwest_client; +#[cfg(feature = "bitreq")] +pub use bitreq_client::*; pub use client::*; pub use error::*; pub use http::*; -#[cfg(feature = "reqwest")] -pub use reqwest_client::*; diff --git a/src/reqwest_client.rs b/src/reqwest_client.rs deleted file mode 100644 index f742e6f..0000000 --- a/src/reqwest_client.rs +++ /dev/null @@ -1,161 +0,0 @@ -use core::fmt; - -use bytes::Bytes; - -use crate::{Http, HttpMethod}; -pub extern crate reqwest; -pub extern crate tokio; - -/// Base backoff in milliseconds. -const BASE_BACKOFF_MILLIS: u64 = 256; -/// Default max retries. -const DEFAULT_MAX_RETRIES: u32 = 10; - -/// Wrapper for [`reqwest::Client`] to act as the HTTP implementation. -#[derive(Debug)] -pub struct ReqwestClient { - /// inner `reqwest` client. - pub inner: reqwest::Client, - /// The maximum number of times to retry a failed request. - max_retries: u32, -} - -/// Reqwest client config builder. -#[derive(Debug)] -pub struct Config { - client: ReqwestClient, -} - -impl Default for Config { - fn default() -> Self { - Self { - client: ReqwestClient { - inner: reqwest::Client::default(), - max_retries: DEFAULT_MAX_RETRIES, - }, - } - } -} - -impl Config { - /// Set the maximum number of times to retry a failed request. - pub fn max_retries(mut self, n: u32) -> Self { - self.client.max_retries = n; - self - } - - /// Build. - pub fn build(self) -> ReqwestClient { - self.client - } -} - -impl Default for ReqwestClient { - fn default() -> Self { - Self::new() - } -} - -impl ReqwestClient { - /// New with default config. - pub fn new() -> Self { - Config::default().build() - } - - /// Return a new reqwest client [`Config`]. - pub fn config() -> Config { - Config::default() - } -} - -impl Http for ReqwestClient { - type Body = Bytes; - - type Err = ReqwestError; - - async fn send<'a>( - &'a self, - method: HttpMethod, - url: &'a str, - body: impl Into, - ) -> Result - where - Self: 'a, - { - let resp = self.send_retry(method, url, body.into()).await?; - - if !resp.status().is_success() { - return Err(ReqwestError::HttpResponse { - status: resp.status().as_u16(), - message: resp.text().await?, - }); - } - - Ok(resp.bytes().await?) - } -} - -impl ReqwestClient { - /// Sends a request and allows for retrying failed attempts. See [`is_status_retryable`]. - async fn send_retry( - &self, - method: HttpMethod, - url: &str, - body: Bytes, - ) -> Result { - let mut delay = BASE_BACKOFF_MILLIS; - let mut attempts = 0; - - loop { - let request = match method { - HttpMethod::GET => self.inner.get(url), - HttpMethod::POST => self.inner.post(url).body(body.clone()), - }; - match request.send().await? { - resp if attempts < self.max_retries && is_status_retryable(resp.status()) => { - tokio::time::sleep(std::time::Duration::from_millis(delay)).await; - delay *= 2; - attempts += 1; - } - resp => return Ok(resp), - } - } - } -} - -/// Whether the response status indicates a failure which can be retried. -/// -/// Currently includes: -/// -/// - `429`: TOO_MANY_REQUESTS -/// - `500`: INTERNAL_SERVER_ERROR -/// - `503`: SERVICE_UNAVAILABLE -fn is_status_retryable(status: reqwest::StatusCode) -> bool { - [429, 500, 503].contains(&status.as_u16()) -} - -/// Error for `ReqwestClient` -#[derive(Debug)] -pub enum ReqwestError { - /// `reqwest` error. - Reqwest(reqwest::Error), - /// Reponse error. - HttpResponse { status: u16, message: String }, -} - -impl fmt::Display for ReqwestError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Reqwest(e) => write!(f, "{e}"), - Self::HttpResponse { status, message } => write!(f, "{status} {message}"), - } - } -} - -impl std::error::Error for ReqwestError {} - -impl From for ReqwestError { - fn from(e: reqwest::Error) -> Self { - Self::Reqwest(e) - } -}