Skip to content

feat: optional serialization support for core types #64

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 28, 2025
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ iroh-io = { version = "0.6.0", default-features = false, optional = true }
positioned-io = { version = "0.3.1", default-features = false }
genawaiter = { version = "0.99.1", features = ["futures03"], optional = true }
tokio = { version = "1", features = ["sync"], default-features = false, optional = true }
serde = { version = "1", features = ["derive"], optional = true }

[features]
serde = ["dep:serde", "bytes/serde"]
tokio_fsm = ["dep:futures-lite", "dep:iroh-io"]
validate = ["dep:genawaiter"]
experimental-mixed = ["dep:tokio"]
default = ["tokio_fsm", "validate"]
default = ["tokio_fsm", "validate", "serde"]

[dev-dependencies]
hex = "0.4.3"
Expand Down
2 changes: 2 additions & 0 deletions src/io/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl DecodeError {
/// or a size mismatch. If the remote end stops listening while we are writing,
/// the error will indicate which parent or chunk we were writing at the time.
#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum EncodeError {
/// The hash of a parent did not match the expected hash
ParentHashMismatch(TreeNode),
Expand All @@ -99,6 +100,7 @@ pub enum EncodeError {
/// File size does not match size in outboard
SizeMismatch,
/// There was an error reading from the underlying io
#[cfg_attr(feature = "serde", serde(with = "crate::io_error_serde"))]
Io(io::Error),
}

Expand Down
95 changes: 64 additions & 31 deletions src/io/mixed.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Read from sync, send to tokio sender
use std::result;
use std::{future::Future, result};

use bytes::Bytes;
use iroh_blake3 as blake3;
use iroh_blake3::guts::parent_cv;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;

use super::{sync::Outboard, EncodeError, Leaf, Parent};
Expand All @@ -13,7 +14,7 @@ use crate::{
};

/// A content item for the bao streaming protocol.
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub enum EncodedItem {
/// total data size, will be the first item
Size(u64),
Expand Down Expand Up @@ -45,28 +46,52 @@ impl From<EncodeError> for EncodedItem {
}
}

/// Abstract sender trait for sending encoded items
pub trait Sender {
/// Error type
type Error;
/// Send an item
fn send(
&mut self,
item: EncodedItem,
) -> impl Future<Output = std::result::Result<(), Self::Error>> + '_;
}

impl Sender for tokio::sync::mpsc::Sender<EncodedItem> {
type Error = tokio::sync::mpsc::error::SendError<EncodedItem>;
fn send(
&mut self,
item: EncodedItem,
) -> impl Future<Output = std::result::Result<(), Self::Error>> + '_ {
tokio::sync::mpsc::Sender::send(self, item)
}
}

/// Traverse ranges relevant to a query from a reader and outboard to a stream
///
/// This function validates the data before writing.
///
/// It is possible to encode ranges from a partial file and outboard.
/// This will either succeed if the requested ranges are all present, or fail
/// as soon as a range is missing.
pub async fn traverse_ranges_validated<D: ReadBytesAt, O: Outboard>(
pub async fn traverse_ranges_validated<D, O, F>(
data: D,
outboard: O,
ranges: &ChunkRangesRef,
encoded: &tokio::sync::mpsc::Sender<EncodedItem>,
) {
encoded
.send(EncodedItem::Size(outboard.tree().size()))
.await
.ok();
let res = match traverse_ranges_validated_impl(data, outboard, ranges, encoded).await {
Ok(()) => EncodedItem::Done,
send: &mut F,
) -> std::result::Result<(), F::Error>
where
D: ReadBytesAt,
O: Outboard,
F: Sender,
{
send.send(EncodedItem::Size(outboard.tree().size())).await?;
let res = match traverse_ranges_validated_impl(data, outboard, ranges, send).await {
Ok(Ok(())) => EncodedItem::Done,
Err(cause) => EncodedItem::Error(cause),
Ok(Err(err)) => return Err(err),
};
encoded.send(res).await.ok();
send.send(res).await
}

/// Encode ranges relevant to a query from a reader and outboard to a writer
Expand All @@ -76,14 +101,19 @@ pub async fn traverse_ranges_validated<D: ReadBytesAt, O: Outboard>(
/// It is possible to encode ranges from a partial file and outboard.
/// This will either succeed if the requested ranges are all present, or fail
/// as soon as a range is missing.
async fn traverse_ranges_validated_impl<D: ReadBytesAt, O: Outboard>(
async fn traverse_ranges_validated_impl<D, O, F>(
data: D,
outboard: O,
ranges: &ChunkRangesRef,
encoded: &tokio::sync::mpsc::Sender<EncodedItem>,
) -> result::Result<(), EncodeError> {
send: &mut F,
) -> result::Result<std::result::Result<(), F::Error>, EncodeError>
where
D: ReadBytesAt,
O: Outboard,
F: Sender,
{
if ranges.is_empty() {
return Ok(());
return Ok(Ok(()));
}
let mut stack: SmallVec<[_; 10]> = SmallVec::<[blake3::Hash; 10]>::new();
stack.push(outboard.root());
Expand Down Expand Up @@ -112,16 +142,13 @@ async fn traverse_ranges_validated_impl<D: ReadBytesAt, O: Outboard>(
if left {
stack.push(l_hash);
}
encoded
.send(
Parent {
node,
pair: (l_hash, r_hash),
}
.into(),
)
.await
.ok();
let item = Parent {
node,
pair: (l_hash, r_hash),
};
if let Err(e) = send.send(item.into()).await {
return Ok(Err(e));
}
}
BaoChunk::Leaf {
start_chunk,
Expand Down Expand Up @@ -152,7 +179,9 @@ async fn traverse_ranges_validated_impl<D: ReadBytesAt, O: Outboard>(
return Err(EncodeError::LeafHashMismatch(start_chunk));
}
for item in out_buf.into_iter() {
encoded.send(item).await.ok();
if let Err(e) = send.send(item).await {
return Ok(Err(e));
}
}
} else {
let actual = hash_subtree(start_chunk.0, &buffer, is_root);
Expand All @@ -164,12 +193,14 @@ async fn traverse_ranges_validated_impl<D: ReadBytesAt, O: Outboard>(
data: buffer,
offset: start_chunk.to_bytes(),
};
encoded.send(item.into()).await.ok();
if let Err(e) = send.send(item.into()).await {
return Ok(Err(e));
}
};
}
}
}
Ok(())
Ok(Ok(()))
}

/// Encode ranges relevant to a query from a slice and outboard to a buffer.
Expand Down Expand Up @@ -299,11 +330,13 @@ mod tests {
async fn smoke() {
let data = [0u8; 100000];
let outboard = PreOrderMemOutboard::create(data, BlockSize::from_chunk_log(4));
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let (mut tx, mut rx) = tokio::sync::mpsc::channel(10);
let mut encoded = Vec::new();
encode_ranges_validated(&data[..], &outboard, &ChunkRanges::empty(), &mut encoded).unwrap();
tokio::spawn(async move {
traverse_ranges_validated(&data[..], &outboard, &ChunkRanges::empty(), &tx).await;
traverse_ranges_validated(&data[..], &outboard, &ChunkRanges::empty(), &mut tx)
.await
.unwrap();
});
let mut res = Vec::new();
while let Some(item) = rx.recv().await {
Expand Down
59 changes: 55 additions & 4 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
//! Implementation of bao streaming for std io and tokio io
use std::pin::Pin;

use bytes::Bytes;

use crate::{blake3, BlockSize, ChunkNum, ChunkRanges, TreeNode};

mod error;
use std::future::Future;

pub use error::*;
use range_collections::{range_set::RangeSetRange, RangeSetRef};
Expand All @@ -27,7 +24,58 @@ pub struct Parent {
pub pair: (blake3::Hash, blake3::Hash),
}

#[cfg(feature = "serde")]
mod serde_support {
use serde::{ser::SerializeSeq, Deserialize, Serialize};

use super::{blake3, Parent, TreeNode};
impl Serialize for Parent {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let (l, r) = self.pair;
let mut seq = serializer.serialize_seq(Some(2))?;
seq.serialize_element(&self.node)?;
seq.serialize_element(l.as_bytes())?;
seq.serialize_element(r.as_bytes())?;
seq.end()
}
}

impl<'a> Deserialize<'a> for Parent {
fn deserialize<D: serde::Deserializer<'a>>(deserializer: D) -> Result<Self, D::Error> {
struct ParentVisitor;
impl<'de> serde::de::Visitor<'de> for ParentVisitor {
type Value = Parent;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a parent node")
}

fn visit_seq<A: serde::de::SeqAccess<'de>>(
self,
mut seq: A,
) -> Result<Self::Value, A::Error> {
let node = seq.next_element::<TreeNode>()?.ok_or_else(|| {
serde::de::Error::invalid_length(0, &"a parent node with 3 elements")
})?;
let l = seq.next_element::<[u8; 32]>()?.ok_or_else(|| {
serde::de::Error::invalid_length(1, &"a parent node with 3 elements")
})?;
let r = seq.next_element::<[u8; 32]>()?.ok_or_else(|| {
serde::de::Error::invalid_length(2, &"a parent node with 3 elements")
})?;
Ok(Parent {
node,
pair: (blake3::Hash::from(l), blake3::Hash::from(r)),
})
}
}
deserializer.deserialize_seq(ParentVisitor)
}
}
}

/// A leaf node.
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Leaf {
/// The byte offset of the leaf in the file.
pub offset: u64,
Expand All @@ -49,6 +97,7 @@ impl std::fmt::Debug for Leaf {
/// After reading the initial header, the only possible items are `Parent` and
/// `Leaf`.
#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum BaoContentItem {
/// a parent node, to update the outboard
Parent(Parent),
Expand Down Expand Up @@ -161,7 +210,9 @@ pub(crate) fn combine_hash_pair(l: &blake3::Hash, r: &blake3::Hash) -> [u8; 64]
res
}

pub(crate) type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
#[cfg(feature = "validate")]
pub(crate) type LocalBoxFuture<'a, T> =
std::pin::Pin<Box<dyn std::future::Future<Output = T> + 'a>>;

#[cfg(test)]
mod tests {
Expand Down
Loading
Loading