Skip to content

Commit 96f0302

Browse files
committed
moq-transport: add fetch messages
1 parent 8e06318 commit 96f0302

File tree

9 files changed

+223
-0
lines changed

9 files changed

+223
-0
lines changed

moq-transport/src/message/fetch.rs

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, Tuple};
2+
use crate::message::GroupOrder;
3+
4+
/// Sent by the subscriber to request to request a range
5+
/// of already published objects within a track.
6+
#[derive(Clone, Debug)]
7+
pub struct Fetch {
8+
/// The subscription ID
9+
pub id: u64,
10+
11+
/// Track properties
12+
pub track_namespace: Tuple,
13+
pub track_name: String,
14+
15+
/// Subscriber Priority
16+
pub subscriber_priority: u8,
17+
18+
pub group_order: GroupOrder,
19+
20+
/// The start/end group/object.
21+
pub start_group: u64,
22+
pub start_object: u64,
23+
pub end_group: u64,
24+
pub end_object: u64,
25+
26+
/// Optional parameters
27+
pub params: Params,
28+
}
29+
30+
impl Decode for Fetch {
31+
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
32+
let id = u64::decode(r)?;
33+
34+
let track_namespace = Tuple::decode(r)?;
35+
let track_name = String::decode(r)?;
36+
37+
let subscriber_priority = u8::decode(r)?;
38+
39+
let group_order = GroupOrder::decode(r)?;
40+
41+
let start_group = u64::decode(r)?;
42+
let start_object = u64::decode(r)?;
43+
let end_group = u64::decode(r)?;
44+
let end_object = u64::decode(r)?;
45+
46+
let params = Params::decode(r)?;
47+
48+
Ok(Self {
49+
id,
50+
track_namespace,
51+
track_name,
52+
subscriber_priority,
53+
group_order,
54+
start_group,
55+
start_object,
56+
end_group,
57+
end_object,
58+
params,
59+
})
60+
}
61+
}
62+
63+
impl Encode for Fetch {
64+
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
65+
self.id.encode(w)?;
66+
67+
self.track_namespace.encode(w)?;
68+
self.track_name.encode(w)?;
69+
70+
self.subscriber_priority.encode(w)?;
71+
72+
self.group_order.encode(w)?;
73+
74+
self.start_group.encode(w)?;
75+
self.start_object.encode(w)?;
76+
self.end_group.encode(w)?;
77+
self.end_object.encode(w)?;
78+
79+
self.params.encode(w)?;
80+
81+
Ok(())
82+
}
83+
}
+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use crate::coding::{Decode, DecodeError, Encode, EncodeError};
2+
3+
/// A subscriber issues a FETCH_CANCEL message to a publisher indicating it is
4+
/// no longer interested in receiving Objects for the fetch specified by 'Subscribe ID'.
5+
#[derive(Clone, Debug)]
6+
pub struct FetchCancel {
7+
/// The subscription ID
8+
pub id: u64,
9+
}
10+
11+
impl Decode for FetchCancel {
12+
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
13+
let id = u64::decode(r)?;
14+
Ok(Self { id })
15+
}
16+
}
17+
18+
impl Encode for FetchCancel {
19+
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
20+
self.id.encode(w)?;
21+
22+
Ok(())
23+
}
24+
}
+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use crate::coding::{Decode, DecodeError, Encode, EncodeError};
2+
3+
/// Sent by the server to indicate that the client should connect to a different server.
4+
#[derive(Clone, Debug)]
5+
pub struct FetchError {
6+
/// The ID for this subscription.
7+
pub id: u64,
8+
9+
/// An error code.
10+
pub code: u64,
11+
12+
/// An optional, human-readable reason.
13+
pub reason: String,
14+
}
15+
16+
impl Decode for FetchError {
17+
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
18+
let id = u64::decode(r)?;
19+
20+
let code = u64::decode(r)?;
21+
let reason = String::decode(r)?;
22+
23+
Ok(Self { id, code, reason })
24+
}
25+
}
26+
27+
impl Encode for FetchError {
28+
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
29+
self.id.encode(w)?;
30+
31+
self.code.encode(w)?;
32+
self.reason.encode(w)?;
33+
34+
Ok(())
35+
}
36+
}

moq-transport/src/message/fetch_ok.rs

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params};
2+
3+
/// A publisher sends a FETCH_OK control message in response to successful fetches.
4+
#[derive(Clone, Debug)]
5+
pub struct FetchOk {
6+
/// The subscription ID
7+
pub id: u64,
8+
9+
/// Order groups will be delivered in
10+
pub group_order: u8,
11+
12+
/// True if all objects have been published on this track
13+
pub end_of_track: u8,
14+
15+
/// The largest Group ID available for this track (last if end_of_track)
16+
pub largest_group_id: u64,
17+
/// The largest Object ID available within the largest Group ID for this track (last if end_of_track)
18+
pub largest_object_id: u64,
19+
20+
/// Optional parameters
21+
pub params: Params,
22+
}
23+
24+
impl Decode for FetchOk {
25+
fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
26+
let id = u64::decode(r)?;
27+
28+
let group_order = u8::decode(r)?;
29+
30+
let end_of_track = u8::decode(r)?;
31+
32+
let largest_group_id = u64::decode(r)?;
33+
let largest_object_id = u64::decode(r)?;
34+
35+
let params = Params::decode(r)?;
36+
37+
Ok(Self { id, group_order, end_of_track, largest_group_id, largest_object_id, params })
38+
}
39+
}
40+
41+
impl Encode for FetchOk {
42+
fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
43+
self.id.encode(w)?;
44+
45+
self.group_order.encode(w)?;
46+
47+
self.end_of_track.encode(w)?;
48+
49+
self.largest_group_id.encode(w)?;
50+
self.largest_object_id.encode(w)?;
51+
52+
self.params.encode(w)?;
53+
54+
Ok(())
55+
}
56+
}

moq-transport/src/message/mod.rs

+14
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ mod track_status_request;
5555
mod unannounce;
5656
mod unsubscribe;
5757
mod unsubscribe_namespace;
58+
mod fetch;
59+
mod fetch_cancel;
60+
mod fetch_ok;
61+
mod fetch_error;
5862

5963
pub use announce::*;
6064
pub use announce_cancel::*;
@@ -79,6 +83,10 @@ pub use track_status_request::*;
7983
pub use unannounce::*;
8084
pub use unsubscribe::*;
8185
pub use unsubscribe_namespace::*;
86+
pub use fetch::*;
87+
pub use fetch_cancel::*;
88+
pub use fetch_ok::*;
89+
pub use fetch_error::*;
8290

8391
use crate::coding::{Decode, DecodeError, Encode, EncodeError};
8492
use std::fmt;
@@ -209,6 +217,12 @@ message_types! {
209217
SubscribeNamespaceOk = 0x12,
210218
SubscribeNamespaceError = 0x13,
211219
UnsubscribeNamespace = 0x14,
220+
221+
// FETCH family, sent by subscriber
222+
Fetch = 0x16,
223+
FetchCancel = 0x17,
224+
FetchOk = 0x18,
225+
FetchError = 0x19,
212226
}
213227

214228
/// Track Status Codes

moq-transport/src/message/publisher.rs

+2
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,6 @@ publisher_msgs! {
5151
SubscribeError,
5252
SubscribeDone,
5353
TrackStatus,
54+
FetchOk,
55+
FetchError,
5456
}

moq-transport/src/message/subscriber.rs

+2
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,6 @@ subscriber_msgs! {
5656
SubscribeNamespaceOk,
5757
SubscribeNamespaceError,
5858
UnsubscribeNamespace,
59+
Fetch,
60+
FetchCancel,
5961
}

moq-transport/src/session/publisher.rs

+3
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ impl Publisher {
183183
message::Subscriber::SubscribeNamespaceOk(_msg) => unimplemented!(),
184184
message::Subscriber::SubscribeNamespaceError(_msg) => unimplemented!(),
185185
message::Subscriber::UnsubscribeNamespace(_msg) => unimplemented!(),
186+
// TODO: Implement fetch messages
187+
message::Subscriber::Fetch(_msg) => todo!(),
188+
message::Subscriber::FetchCancel(_msg) => todo!(),
186189
};
187190

188191
if let Err(err) = res {

moq-transport/src/session/subscriber.rs

+3
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ impl Subscriber {
8686
message::Publisher::SubscribeError(msg) => self.recv_subscribe_error(msg),
8787
message::Publisher::SubscribeDone(msg) => self.recv_subscribe_done(msg),
8888
message::Publisher::TrackStatus(msg) => self.recv_track_status(msg),
89+
// TODO: Implement fetch messages
90+
message::Publisher::FetchOk(_msg) => todo!(),
91+
message::Publisher::FetchError(_msg) => todo!(),
8992
};
9093

9194
if let Err(SessionError::Serve(err)) = res {

0 commit comments

Comments
 (0)