Skip to content

Commit

Permalink
connection: allow specifying address when sending client responses (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges authored Oct 3, 2024
1 parent 6a24012 commit 8eb90db
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 36 deletions.
12 changes: 6 additions & 6 deletions src/connmgr/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ struct ConnectionStreamOpts {
blocks_avail: Arc<Counter>,
messages_max: usize,
allow_compression: bool,
sender: channel::LocalSender<zmq::Message>,
sender: channel::LocalSender<(Option<ArrayVec<u8, FROM_MAX>>, zmq::Message)>,
}

struct Worker {
Expand Down Expand Up @@ -1024,8 +1024,8 @@ impl Worker {
id: usize,
stop: AsyncLocalReceiver<()>,
done: AsyncLocalSender<zhttpsocket::AsyncServerStreamHandle>,
zstream_out_receiver: AsyncLocalReceiver<zmq::Message>,
zstream_out_sender: channel::LocalSender<zmq::Message>,
zstream_out_receiver: AsyncLocalReceiver<(Option<ArrayVec<u8, FROM_MAX>>, zmq::Message)>,
zstream_out_sender: channel::LocalSender<(Option<ArrayVec<u8, FROM_MAX>>, zmq::Message)>,
spawner: Spawner,
resolver: Arc<Resolver>,
tls_config_cache: Arc<TlsConfigCache>,
Expand Down Expand Up @@ -1098,7 +1098,7 @@ impl Worker {
Select6::R1(_) => break,
// receiver_recv
Select6::R2(result) => match result {
Ok(msg) => handle_send.set(Some(stream_handle.send(None, msg))),
Ok((addr, msg)) => handle_send.set(Some(stream_handle.send(addr, msg))),
Err(e) => panic!("zstream_out_receiver channel error: {}", e),
},
// handle_send
Expand Down Expand Up @@ -1487,7 +1487,7 @@ impl Worker {
stop: AsyncLocalReceiver<()>,
_done: AsyncLocalSender<()>,
instance_id: Rc<String>,
sender: channel::LocalSender<zmq::Message>,
sender: channel::LocalSender<(Option<ArrayVec<u8, 64>>, zmq::Message)>,
conns: Rc<Connections>,
) {
debug!("client-worker {}: task started: keep_alives", id);
Expand Down Expand Up @@ -1552,7 +1552,7 @@ impl Worker {
id, count
);

if let Err(e) = send.try_send(msg) {
if let Err(e) = send.try_send((None, msg)) {
error!("zhttp write error: {}", e);
}
}
Expand Down
94 changes: 64 additions & 30 deletions src/connmgr/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ struct ZhttpServerStreamSessionOut<'a> {
instance_id: &'a str,
id: &'a [u8],
packet_buf: &'a RefCell<Vec<u8>>,
sender: &'a AsyncLocalSender<zmq::Message>,
sender: &'a AsyncLocalSender<(Option<ArrayVec<u8, 64>>, zmq::Message)>,
shared: &'a StreamSharedData,
}

Expand All @@ -926,7 +926,7 @@ impl<'a> ZhttpServerStreamSessionOut<'a> {
instance_id: &'a str,
id: &'a [u8],
packet_buf: &'a RefCell<Vec<u8>>,
sender: &'a AsyncLocalSender<zmq::Message>,
sender: &'a AsyncLocalSender<(Option<ArrayVec<u8, 64>>, zmq::Message)>,
shared: &'a StreamSharedData,
) -> Self {
Self {
Expand Down Expand Up @@ -971,7 +971,7 @@ impl<'a> ZhttpServerStreamSessionOut<'a> {
make_zhttp_response(addr, zresp, packet_buf)?
};

self.sender.try_send(msg)?;
self.sender.try_send((None, msg))?;

self.shared.inc_out_seq();

Expand Down Expand Up @@ -5478,7 +5478,7 @@ async fn client_stream_connect<E, R1, R2>(
tls_config_cache: &TlsConfigCache,
pool: &ConnectionPool,
zreceiver: &TrackedAsyncLocalReceiver<'_, (arena::Rc<zhttppacket::OwnedRequest>, usize)>,
zsender: &AsyncLocalSender<zmq::Message>,
zsender: &AsyncLocalSender<(Option<ArrayVec<u8, 64>>, zmq::Message)>,
shared: &StreamSharedData,
enable_routing: &E,
response_received: &mut bool,
Expand Down Expand Up @@ -5720,7 +5720,7 @@ async fn client_stream_connection_inner<E>(
tls_config_cache: &TlsConfigCache,
pool: &ConnectionPool,
zreceiver: &TrackedAsyncLocalReceiver<'_, (arena::Rc<zhttppacket::OwnedRequest>, usize)>,
zsender: AsyncLocalSender<zmq::Message>,
zsender: AsyncLocalSender<(Option<ArrayVec<u8, 64>>, zmq::Message)>,
shared: arena::Rc<StreamSharedData>,
enable_routing: &E,
) -> Result<(), Error>
Expand Down Expand Up @@ -5833,7 +5833,7 @@ where

if let Some(msg) = msg {
// best effort
let _ = zsender.try_send(msg);
let _ = zsender.try_send((None, msg));

shared.inc_out_seq();
}
Expand Down Expand Up @@ -5867,7 +5867,7 @@ pub async fn client_stream_connection<E>(
tls_config_cache: &TlsConfigCache,
pool: &ConnectionPool,
zreceiver: AsyncLocalReceiver<(arena::Rc<zhttppacket::OwnedRequest>, usize)>,
zsender: AsyncLocalSender<zmq::Message>,
zsender: AsyncLocalSender<(Option<ArrayVec<u8, 64>>, zmq::Message)>,
shared: arena::Rc<StreamSharedData>,
enable_routing: &E,
) where
Expand Down Expand Up @@ -9196,7 +9196,7 @@ mod tests {
sock: Rc<RefCell<FakeSock>>,
allow_compression: bool,
r_to_conn: channel::LocalReceiver<(arena::Rc<zhttppacket::OwnedRequest>, usize)>,
s_from_conn: channel::LocalSender<zmq::Message>,
s_from_conn: channel::LocalSender<(Option<ArrayVec<u8, 64>>, zmq::Message)>,
shared: arena::Rc<StreamSharedData>,
) -> Result<(), Error> {
let mut sock = AsyncFakeSock::new(sock);
Expand Down Expand Up @@ -9333,15 +9333,22 @@ mod tests {
let mut executor = StepExecutor::new(&reactor, fut);

// fill the handler's outbound message queue
assert_eq!(s_from_conn.try_send(zmq::Message::new()).is_ok(), true);
assert_eq!(s_from_conn.try_send(zmq::Message::new()).is_err(), true);
assert_eq!(
s_from_conn.try_send((None, zmq::Message::new())).is_ok(),
true
);
assert_eq!(
s_from_conn.try_send((None, zmq::Message::new())).is_err(),
true
);
drop(s_from_conn);

// handler won't be able to send a message yet
assert_eq!(check_poll(executor.step()), None);

// read bogus message
let msg = r_from_conn.try_recv().unwrap();
let (addr, msg) = r_from_conn.try_recv().unwrap();
assert!(addr.is_none());
assert_eq!(msg.is_empty(), true);

// no other messages
Expand All @@ -9351,7 +9358,8 @@ mod tests {
assert_eq!(check_poll(executor.step()), None);

// read real message
let msg = r_from_conn.try_recv().unwrap();
let (addr, msg) = r_from_conn.try_recv().unwrap();
assert!(addr.is_none());

// no other messages
assert_eq!(r_from_conn.try_recv().is_err(), true);
Expand Down Expand Up @@ -9386,7 +9394,8 @@ mod tests {
assert_eq!(str::from_utf8(&buf).unwrap(), expected);

// read message
let msg = r_from_conn.try_recv().unwrap();
let (addr, msg) = r_from_conn.try_recv().unwrap();
assert!(addr.is_none());

// no other messages
assert_eq!(r_from_conn.try_recv().is_err(), true);
Expand Down Expand Up @@ -9440,7 +9449,8 @@ mod tests {
assert_eq!(check_poll(executor.step()), None);

// read message
let msg = r_from_conn.try_recv().unwrap();
let (addr, msg) = r_from_conn.try_recv().unwrap();
assert!(addr.is_none());

// no other messages
assert_eq!(r_from_conn.try_recv().is_err(), true);
Expand All @@ -9459,7 +9469,8 @@ mod tests {
assert_eq!(check_poll(executor.step()), Some(()));

// read message
let msg = r_from_conn.try_recv().unwrap();
let (addr, msg) = r_from_conn.try_recv().unwrap();
assert!(addr.is_none());

// no other messages
assert_eq!(r_from_conn.try_recv().is_err(), true);
Expand Down Expand Up @@ -9532,7 +9543,8 @@ mod tests {
assert_eq!(check_poll(executor.step()), None);

// read message
let msg = r_from_conn.try_recv().unwrap();
let (addr, msg) = r_from_conn.try_recv().unwrap();
assert!(addr.is_none());

// no other messages
assert!(r_from_conn.try_recv().is_err());
Expand Down Expand Up @@ -9569,7 +9581,8 @@ mod tests {
sock.borrow_mut().clear_write_allowed();

// read message
let msg = r_from_conn.try_recv().unwrap();
let (addr, msg) = r_from_conn.try_recv().unwrap();
assert!(addr.is_none());

// no other messages
assert!(r_from_conn.try_recv().is_err());
Expand Down Expand Up @@ -9615,7 +9628,8 @@ mod tests {
assert_eq!(check_poll(executor.step()), None);

// read message
let msg = r_from_conn.try_recv().unwrap();
let (addr, msg) = r_from_conn.try_recv().unwrap();
assert!(addr.is_none());

// no other messages
assert!(r_from_conn.try_recv().is_err());
Expand Down Expand Up @@ -9685,15 +9699,22 @@ mod tests {
let mut executor = StepExecutor::new(&reactor, fut);

// fill the handler's outbound message queue
assert_eq!(s_from_conn.try_send(zmq::Message::new()).is_ok(), true);
assert_eq!(s_from_conn.try_send(zmq::Message::new()).is_err(), true);
assert_eq!(
s_from_conn.try_send((None, zmq::Message::new())).is_ok(),
true
);
assert_eq!(
s_from_conn.try_send((None, zmq::Message::new())).is_err(),
true
);
drop(s_from_conn);

// handler won't be able to send a message yet
assert_eq!(check_poll(executor.step()), None);

// read bogus message
let msg = r_from_conn.try_recv().unwrap();
let (addr, msg) = r_from_conn.try_recv().unwrap();
assert!(addr.is_none());
assert_eq!(msg.is_empty(), true);

// no other messages
Expand All @@ -9703,7 +9724,8 @@ mod tests {
assert_eq!(check_poll(executor.step()), None);

// read real message
let msg = r_from_conn.try_recv().unwrap();
let (addr, msg) = r_from_conn.try_recv().unwrap();
assert!(addr.is_none());

// no other messages
assert_eq!(r_from_conn.try_recv().is_err(), true);
Expand Down Expand Up @@ -9785,7 +9807,8 @@ mod tests {
assert_eq!(check_poll(executor.step()), None);

// read message
let msg = r_from_conn.try_recv().unwrap();
let (addr, msg) = r_from_conn.try_recv().unwrap();
assert!(addr.is_none());

// no other messages
assert_eq!(r_from_conn.try_recv().is_err(), true);
Expand Down Expand Up @@ -9826,7 +9849,8 @@ mod tests {
assert_eq!(check_poll(executor.step()), None);

// read message
let msg = r_from_conn.try_recv().unwrap();
let (addr, msg) = r_from_conn.try_recv().unwrap();
assert!(addr.is_none());

let buf = &msg[..];

Expand Down Expand Up @@ -9924,15 +9948,22 @@ mod tests {
let mut executor = StepExecutor::new(&reactor, fut);

// fill the handler's outbound message queue
assert_eq!(s_from_conn.try_send(zmq::Message::new()).is_ok(), true);
assert_eq!(s_from_conn.try_send(zmq::Message::new()).is_err(), true);
assert_eq!(
s_from_conn.try_send((None, zmq::Message::new())).is_ok(),
true
);
assert_eq!(
s_from_conn.try_send((None, zmq::Message::new())).is_err(),
true
);
drop(s_from_conn);

// handler won't be able to send a message yet
assert_eq!(check_poll(executor.step()), None);

// read bogus message
let msg = r_from_conn.try_recv().unwrap();
let (addr, msg) = r_from_conn.try_recv().unwrap();
assert!(addr.is_none());
assert_eq!(msg.is_empty(), true);

// no other messages
Expand All @@ -9942,7 +9973,8 @@ mod tests {
assert_eq!(check_poll(executor.step()), None);

// read real message
let msg = r_from_conn.try_recv().unwrap();
let (addr, msg) = r_from_conn.try_recv().unwrap();
assert!(addr.is_none());

// no other messages
assert_eq!(r_from_conn.try_recv().is_err(), true);
Expand Down Expand Up @@ -10026,7 +10058,8 @@ mod tests {
assert_eq!(check_poll(executor.step()), None);

// read message
let msg = r_from_conn.try_recv().unwrap();
let (addr, msg) = r_from_conn.try_recv().unwrap();
assert!(addr.is_none());

// no other messages
assert_eq!(r_from_conn.try_recv().is_err(), true);
Expand Down Expand Up @@ -10078,7 +10111,8 @@ mod tests {
assert_eq!(check_poll(executor.step()), None);

// read message
let msg = r_from_conn.try_recv().unwrap();
let (addr, msg) = r_from_conn.try_recv().unwrap();
assert!(addr.is_none());

let buf = &msg[..];

Expand Down

0 comments on commit 8eb90db

Please sign in to comment.