Skip to content

Commit ed219da

Browse files
committed
quick barely tested impl of datagram
1 parent d9a6af8 commit ed219da

File tree

3 files changed

+207
-34
lines changed

3 files changed

+207
-34
lines changed

Diff for: examples/throughput/main.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ fn main() {
2727
throughput_message_io(Transport::Ws, CHUNK);
2828
// for platforms that support it
2929
#[cfg(feature = "unixsocket")]
30-
throughput_message_io(Transport::UnixSocket, CHUNK);
30+
throughput_message_io(Transport::UnixSocketStream, CHUNK);
31+
#[cfg(feature = "unixsocket")]
32+
throughput_message_io(Transport::UnixSocketDatagram, CHUNK);
3133
println!();
3234
throughput_native_udp(CHUNK);
3335
throughput_native_tcp(CHUNK);

Diff for: src/adapters/unix_socket.rs

+164-19
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::network::adapter::{
77
use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen};
88

99
use mio::event::{Source};
10-
use mio::net::{UnixListener, UnixStream};
10+
use mio::net::{UnixDatagram, UnixListener, UnixStream};
1111

1212
use std::mem::MaybeUninit;
1313
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@@ -69,17 +69,17 @@ impl Default for UnixSocketConnectConfig {
6969
}
7070
}
7171

72-
pub(crate) struct UnixSocketAdapter;
73-
impl Adapter for UnixSocketAdapter {
74-
type Remote = RemoteResource;
75-
type Local = LocalResource;
72+
pub(crate) struct UnixSocketStreamAdapter;
73+
impl Adapter for UnixSocketStreamAdapter {
74+
type Remote = StreamRemoteResource;
75+
type Local = StreamLocalResource;
7676
}
7777

78-
pub(crate) struct RemoteResource {
78+
pub(crate) struct StreamRemoteResource {
7979
stream: UnixStream
8080
}
8181

82-
impl Resource for RemoteResource {
82+
impl Resource for StreamRemoteResource {
8383
fn source(&mut self) -> &mut dyn Source {
8484
&mut self.stream
8585
}
@@ -94,14 +94,14 @@ pub fn check_stream_ready(stream: &UnixStream) -> PendingStatus{
9494
return PendingStatus::Ready;
9595
}
9696

97-
impl Remote for RemoteResource {
97+
impl Remote for StreamRemoteResource {
9898
fn connect_with(
9999
config: TransportConnect,
100100
remote_addr: RemoteAddr,
101101
) -> io::Result<ConnectionInfo<Self>> {
102102

103103
let stream_config = match config {
104-
TransportConnect::UnixSocket(config) => config,
104+
TransportConnect::UnixSocketStream(config) => config,
105105
_ => panic!("Internal error: Got wrong config"),
106106
};
107107

@@ -178,18 +178,18 @@ impl Remote for RemoteResource {
178178
}
179179
}
180180

181-
pub(crate) struct LocalResource {
181+
pub(crate) struct StreamLocalResource {
182182
listener: UnixListener,
183183
bind_path: PathBuf
184184
}
185185

186-
impl Resource for LocalResource {
186+
impl Resource for StreamLocalResource {
187187
fn source(&mut self) -> &mut dyn Source {
188188
&mut self.listener
189189
}
190190
}
191191

192-
impl Drop for LocalResource {
192+
impl Drop for StreamLocalResource {
193193
fn drop(&mut self) {
194194
// this may fail if the file is already removed
195195
match fs::remove_file(&self.bind_path) {
@@ -199,23 +199,22 @@ impl Drop for LocalResource {
199199
}
200200
}
201201

202-
impl Local for LocalResource {
203-
type Remote = RemoteResource;
202+
impl Local for StreamLocalResource {
203+
type Remote = StreamRemoteResource;
204204

205205
fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
206206
let config = match config {
207-
TransportListen::UnixSocket(config) => config,
207+
TransportListen::UnixStreamSocket(config) => config,
208208
_ => panic!("Internal error: Got wrong config"),
209209
};
210210

211211
// TODO: fallback to ip when we are able to set path to none
212-
let path_copy = config.path.clone();
213-
let listener = UnixListener::bind(config.path)?;
212+
let listener = UnixListener::bind(&config.path)?;
214213
let local_addr = listener.local_addr()?;
215214
Ok(ListeningInfo {
216215
local: Self {
217216
listener,
218-
bind_path: path_copy
217+
bind_path: config.path
219218
},
220219
// same issue as above my change in https://github.com/tokio-rs/mio/pull/1749
221220
// relevant issue https://github.com/tokio-rs/mio/issues/1527
@@ -228,7 +227,7 @@ impl Local for LocalResource {
228227
match self.listener.accept() {
229228
Ok((stream, addr)) => accept_remote(AcceptedType::Remote(
230229
create_null_socketaddr(), // TODO: provide correct address
231-
RemoteResource { stream },
230+
StreamRemoteResource { stream },
232231
)),
233232
Err(ref err) if err.kind() == ErrorKind::WouldBlock => break,
234233
Err(ref err) if err.kind() == ErrorKind::Interrupted => continue,
@@ -237,3 +236,149 @@ impl Local for LocalResource {
237236
}
238237
}
239238
}
239+
240+
pub(crate) struct DatagramRemoteResource {
241+
datagram: UnixDatagram
242+
}
243+
244+
impl Resource for DatagramRemoteResource {
245+
fn source(&mut self) -> &mut dyn Source {
246+
&mut self.datagram
247+
}
248+
}
249+
250+
impl Remote for DatagramRemoteResource {
251+
fn connect_with(
252+
config: TransportConnect,
253+
remote_addr: RemoteAddr,
254+
) -> io::Result<ConnectionInfo<Self>> {
255+
let config = match config {
256+
TransportConnect::UnixSocketDatagram(config) => config,
257+
_ => panic!("Internal error: Got wrong config"),
258+
};
259+
260+
let datagram = UnixDatagram::unbound()?;
261+
datagram.connect(config.path)?;
262+
263+
Ok(ConnectionInfo {
264+
local_addr: create_null_socketaddr(),
265+
peer_addr: create_null_socketaddr(),
266+
remote: Self {
267+
datagram
268+
}
269+
})
270+
}
271+
272+
// A majority of send, reciev and accept in local are reused from udp.rs due to similarities
273+
274+
fn receive(&self, mut process_data: impl FnMut(&[u8])) -> ReadStatus {
275+
let buffer: MaybeUninit<[u8; MAX_PAYLOAD_LEN]> = MaybeUninit::uninit();
276+
let mut input_buffer = unsafe { buffer.assume_init() }; // Avoid to initialize the array
277+
278+
loop {
279+
match self.datagram.recv(&mut input_buffer) {
280+
Ok(size) => process_data(&mut input_buffer[..size]),
281+
Err(ref err) if err.kind() == ErrorKind::WouldBlock => {
282+
break ReadStatus::WaitNextEvent
283+
}
284+
Err(ref err) if err.kind() == ErrorKind::ConnectionRefused => {
285+
// Avoid ICMP generated error to be logged
286+
break ReadStatus::WaitNextEvent
287+
}
288+
Err(err) => {
289+
log::error!("unix datagram socket receive error: {}", err);
290+
break ReadStatus::WaitNextEvent // Should not happen
291+
}
292+
}
293+
}
294+
}
295+
296+
fn send(&self, data: &[u8]) -> SendStatus {
297+
loop {
298+
match self.datagram.send(data) {
299+
Ok(_) => break SendStatus::Sent,
300+
// Avoid ICMP generated error to be logged
301+
Err(ref err) if err.kind() == ErrorKind::ConnectionRefused => {
302+
break SendStatus::ResourceNotFound
303+
}
304+
Err(ref err) if err.kind() == ErrorKind::WouldBlock => continue,
305+
Err(ref err) if err.kind() == ErrorKind::Other => {
306+
break SendStatus::MaxPacketSizeExceeded
307+
}
308+
Err(err) => {
309+
log::error!("unix datagram socket send error: {}", err);
310+
break SendStatus::ResourceNotFound // should not happen
311+
}
312+
}
313+
}
314+
}
315+
316+
fn pending(&self, readiness: Readiness) -> PendingStatus {
317+
PendingStatus::Ready
318+
}
319+
}
320+
// datagram is also used for listener
321+
pub(crate) struct DatagramLocalResource {
322+
listener: UnixDatagram,
323+
bind_path: PathBuf
324+
}
325+
326+
impl Resource for DatagramLocalResource {
327+
fn source(&mut self) -> &mut dyn Source {
328+
&mut self.listener
329+
}
330+
}
331+
332+
333+
impl Local for DatagramLocalResource {
334+
type Remote = DatagramRemoteResource;
335+
336+
fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
337+
let config = match config {
338+
TransportListen::UnixDatagramSocket(config) => config,
339+
_ => panic!("Internal error: Got wrong config"),
340+
};
341+
342+
let listener = UnixDatagram::bind(&config.path)?;
343+
344+
Ok(ListeningInfo {
345+
local: Self {
346+
listener,
347+
bind_path: config.path
348+
},
349+
local_addr: create_null_socketaddr(),
350+
})
351+
}
352+
353+
fn accept(&self, mut accept_remote: impl FnMut(AcceptedType<'_, Self::Remote>)) {
354+
let buffer: MaybeUninit<[u8; MAX_PAYLOAD_LEN]> = MaybeUninit::uninit();
355+
let mut input_buffer = unsafe { buffer.assume_init() }; // Avoid to initialize the array
356+
357+
loop {
358+
match self.listener.recv_from(&mut input_buffer) {
359+
Ok((size, addr)) => {
360+
let data = &mut input_buffer[..size];
361+
accept_remote(AcceptedType::Data(create_null_socketaddr(), data))
362+
}
363+
Err(ref err) if err.kind() == ErrorKind::WouldBlock => break,
364+
Err(err) => break log::error!("Unix datagram socket accept error: {}", err), // Should never happen
365+
};
366+
}
367+
}
368+
}
369+
370+
impl Drop for DatagramLocalResource {
371+
fn drop(&mut self) {
372+
// this may fail if the file is already removed
373+
match fs::remove_file(&self.bind_path) {
374+
Ok(_) => (),
375+
Err(err) => log::error!("Error removing unix socket file on drop: {}", err),
376+
}
377+
}
378+
}
379+
380+
pub(crate) struct UnixSocketDatagramAdapter;
381+
impl Adapter for UnixSocketDatagramAdapter {
382+
type Remote = DatagramRemoteResource;
383+
type Local = DatagramLocalResource;
384+
}

0 commit comments

Comments
 (0)