Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ serde_json = "1"

# Optional dependencies
bytes = { version = "1", optional = true }
reqwest = { version = "0.12.23", optional = true }
bitreq = { version = "0.2.0", features = ["async-https"], optional = true }
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"], optional = true }

[dev-dependencies]
anyhow = "1"
futures = { version = "0.3"}
log = "0.4"
mempool_space_api = { path = ".", features = ["reqwest"] }
mempool_space_api = { path = ".", features = ["bitreq"] }
miniscript = { version = "12" }
pretty_env_logger = "0.5.0"

[features]
default = []
reqwest = ["dep:reqwest", "tokio", "bytes"]
bitreq = ["dep:bitreq", "tokio", "bytes"]

[[example]]
name = "client"
Expand Down
4 changes: 2 additions & 2 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ const URL: &str = "https://mempool.space/api";

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let reqwest_client = mempool_space_api::ReqwestClient::default();
let client = AsyncClient::new(URL, &reqwest_client);
let bitreq_client = mempool_space_api::BitreqClient::default();
let client = AsyncClient::new(URL, &bitreq_client);

// GET /blocks/tip/height.
let res = client.get_tip_height().await?;
Expand Down
11 changes: 8 additions & 3 deletions examples/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use std::sync::Arc;

/// Server url.
const URL: &str = "https://mempool.space/signet/api";
const STOP_GAP: u32 = 10;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
pretty_env_logger::init_timed();

let reqwest_client = mempool_space_api::ReqwestClient::default();
let client = AsyncClient::new(URL, &reqwest_client);
let bitreq_client = mempool_space_api::BitreqClient::default();
let client = AsyncClient::new(URL, &bitreq_client);

let desc_str = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/1'/0'/0/*)";
let secp = secp256k1::Secp256k1::new();
Expand Down Expand Up @@ -51,6 +52,10 @@ async fn main() -> anyhow::Result<()> {
})
.collect::<FuturesOrdered<_>>();

if futures.is_empty() {
break;
}

for (index, script, txs) in futures.try_collect::<Vec<_>>().await? {
if txs.is_empty() {
unused_ct += 1;
Expand All @@ -65,7 +70,7 @@ async fn main() -> anyhow::Result<()> {
}

// Gap limit reached
if unused_ct > 20 {
if unused_ct > STOP_GAP {
log::info!("Last active index {:?}", last_active);
break;
}
Expand Down
134 changes: 134 additions & 0 deletions src/bitreq_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use core::fmt;

use bitreq::{Request, Response};
use bytes::Bytes;

use crate::{Http, HttpMethod};

pub extern crate bitreq;
pub extern crate tokio;

/// Base backoff in milliseconds.
const BASE_BACKOFF_MILLIS: u64 = 256;
/// Default max retries.
const DEFAULT_MAX_RETRIES: u32 = 10;

/// HTTP client implementation.
#[derive(Debug)]
pub struct BitreqClient {
/// The maximum number of times to retry a failed request.
/// In the future this may be a configurable option.
max_retries: u32,
}

impl BitreqClient {
/// New.
pub fn new() -> Self {
Self::default()
}
}

impl Default for BitreqClient {
fn default() -> Self {
Self {
max_retries: DEFAULT_MAX_RETRIES,
}
}
}

impl Http for BitreqClient {
type Body = Bytes;

type Err = BitreqError;

async fn send<'a>(
&'a self,
method: HttpMethod,
url: &'a str,
body: impl Into<Self::Body>,
) -> Result<Self::Body, Self::Err>
where
Self: 'a,
{
let resp = self.send_retry(method.into(), url, body.into()).await?;

if !is_status_ok(resp.status_code) {
return Err(BitreqError::HttpResponse {
status: resp.status_code,
message: resp.reason_phrase,
});
}

Ok(resp.into_bytes().into())
}
}

impl BitreqClient {
/// Sends a request and allows for retrying failed attempts. See [`is_status_retryable`].
async fn send_retry(
&self,
method: bitreq::Method,
url: &str,
body: Bytes,
) -> Result<Response, bitreq::Error> {
let mut delay = BASE_BACKOFF_MILLIS;
let mut attempts = 0;

loop {
match Request::new(method.clone(), url)
.with_body(body.clone())
.send_async()
.await?
{
resp if attempts < self.max_retries && is_status_retryable(resp.status_code) => {
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
delay *= 2;
attempts += 1;
}
resp => return Ok(resp),
}
}
}
}

/// Whether the response status indicates a failure which can be retried.
///
/// Currently includes:
///
/// - `429`: TOO_MANY_REQUESTS
/// - `500`: INTERNAL_SERVER_ERROR
/// - `503`: SERVICE_UNAVAILABLE
fn is_status_retryable(status: i32) -> bool {
[429, 500, 503].contains(&status)
}

// Whether the response status code is `200 OK`.
fn is_status_ok(status: i32) -> bool {
status == 200
}

/// Error for `BitreqClient`
#[derive(Debug)]
pub enum BitreqError {
/// `bitreq` error.
Bitreq(bitreq::Error),
/// Reponse error.
HttpResponse { status: i32, message: String },
}

impl fmt::Display for BitreqError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Bitreq(e) => write!(f, "{e}"),
Self::HttpResponse { status, message } => write!(f, "{status} {message}"),
}
}
}

impl std::error::Error for BitreqError {}

impl From<bitreq::Error> for BitreqError {
fn from(e: bitreq::Error) -> Self {
Self::Bitreq(e)
}
}
4 changes: 2 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ mod test {

#[tokio::test]
async fn test_get_tip() -> anyhow::Result<()> {
let reqwest_client = crate::ReqwestClient::new();
let client = AsyncClient::new(URL, reqwest_client);
let bitreq_client = crate::BitreqClient::new();
let client = AsyncClient::new(URL, bitreq_client);

let _ = client.get_tip_height().await?;
let _ = client.get_tip_hash().await?;
Expand Down
10 changes: 10 additions & 0 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ impl HttpMethod {
pub const POST: Self = Self(Method::Post);
}

#[cfg(feature = "bitreq")]
impl From<HttpMethod> for bitreq::Method {
fn from(method: HttpMethod) -> Self {
match method.0 {
Method::Get => bitreq::Method::Get,
Method::Post => bitreq::Method::Post,
}
}
}

/// Trait describing the behavior required of the HTTP client.
pub trait Http {
/// Body
Expand Down
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//! `mempool_space_api`

pub mod api;
#[cfg(feature = "bitreq")]
mod bitreq_client;
mod client;
mod error;
mod http;
#[cfg(feature = "reqwest")]
mod reqwest_client;

#[cfg(feature = "bitreq")]
pub use bitreq_client::*;
pub use client::*;
pub use error::*;
pub use http::*;
#[cfg(feature = "reqwest")]
pub use reqwest_client::*;
Loading