Skip to content

Commit

Permalink
connmgr: don't send keep-alives during handoff (#48072)
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges authored Sep 30, 2024
1 parent fbc0d7b commit fe718eb
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 151 deletions.
184 changes: 109 additions & 75 deletions src/connmgr/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,43 +223,49 @@ impl Batch {
self.nodes.remove(key.nkey);
}

fn take_group<'a, 'b: 'a, F>(&'a mut self, get_ids: F) -> Option<BatchGroup>
fn take_group<'a, 'b: 'a, F>(&'a mut self, get_id: F) -> Option<BatchGroup>
where
F: Fn(usize) -> (&'b [u8], u32),
F: Fn(usize) -> Option<(&'b [u8], u32)>,
{
// find the next addr with items
while self.addr_index < self.addrs.len() && self.addrs[self.addr_index].1.is_empty() {
self.addr_index += 1;
}

// if all are empty, we're done
if self.addr_index == self.addrs.len() {
assert!(self.nodes.is_empty());
return None;
}
let addrs = &mut self.addrs;
let mut ids = self.group_ids.get_as_new();

let (addr, keys) = &mut self.addrs[self.addr_index];
while ids.is_empty() {
// find the next addr with items
while self.addr_index < addrs.len() && addrs[self.addr_index].1.is_empty() {
self.addr_index += 1;
}

self.last_group_ckeys.clear();
// if all are empty, we're done
if self.addr_index == addrs.len() {
assert!(self.nodes.is_empty());
return None;
}

let mut ids = self.group_ids.get_as_new();
let keys = &mut addrs[self.addr_index].1;

// get ids/seqs
while ids.len() < zhttppacket::IDS_MAX {
let nkey = match keys.pop_front(&mut self.nodes) {
Some(nkey) => nkey,
None => break,
};
self.last_group_ckeys.clear();
ids.clear();

let ckey = self.nodes[nkey].value;
self.nodes.remove(nkey);
// get ids/seqs
while ids.len() < zhttppacket::IDS_MAX {
let nkey = match keys.pop_front(&mut self.nodes) {
Some(nkey) => nkey,
None => break,
};

let (id, seq) = get_ids(ckey);
let ckey = self.nodes[nkey].value;
self.nodes.remove(nkey);

self.last_group_ckeys.push(ckey);
ids.push(zhttppacket::Id { id, seq: Some(seq) });
if let Some((id, seq)) = get_id(ckey) {
self.last_group_ckeys.push(ckey);
ids.push(zhttppacket::Id { id, seq: Some(seq) });
}
}
}

let addr = &addrs[self.addr_index].0;

Some(BatchGroup { addr, ids })
}

Expand Down Expand Up @@ -547,19 +553,27 @@ impl Connections {
let batch = &mut items.batch;

while !batch.is_empty() {
let group = batch
.take_group(|ckey| {
let group = {
let group = batch.take_group(|ckey| {
let ci = &nodes[ckey].value;
let cshared = ci.shared.as_ref().unwrap().get();

// addr could have been removed after adding to the batch
cshared.to_addr().get()?;

// item is guaranteed to have an id. only items with an
// id are added to a batch, and if an item's id is
// removed then the item is removed from the batch
let id = ci.id.as_ref().unwrap();

(&id.1, cshared.out_seq())
})
.unwrap();
Some((&id.1, cshared.out_seq()))
});

match group {
Some(group) => group,
None => continue,
}
};

let count = group.ids().len();

Expand Down Expand Up @@ -1642,48 +1656,51 @@ impl Worker {
let sender = AsyncLocalSender::new(sender);

'main: loop {
while conns.batch_is_empty() {
// wait for next keep alive time
match select_2(stop.recv(), next_keep_alive_timeout.elapsed()).await {
Select2::R1(_) => break 'main,
Select2::R2(_) => {}
}
// wait for next keep alive time
match select_2(stop.recv(), next_keep_alive_timeout.elapsed()).await {
Select2::R1(_) => break,
Select2::R2(_) => {}
}

for _ in 0..conns.batch_capacity() {
if next_keep_alive_index >= conns.items_capacity() {
break;
}
for _ in 0..conns.batch_capacity() {
if next_keep_alive_index >= conns.items_capacity() {
break;
}

let key = next_keep_alive_index;
let key = next_keep_alive_index;

next_keep_alive_index += 1;
next_keep_alive_index += 1;

if conns.can_stream(key) {
// ignore errors
let _ = conns.batch_add(key);
}
if conns.can_stream(key) {
// ignore errors
let _ = conns.batch_add(key);
}
}

keep_alive_count += 1;

if keep_alive_count >= KEEP_ALIVE_BATCHES {
keep_alive_count = 0;
next_keep_alive_index = 0;
}
keep_alive_count += 1;

// keep steady pace
next_keep_alive_time += KEEP_ALIVE_INTERVAL;
next_keep_alive_timeout.set_deadline(next_keep_alive_time);
if keep_alive_count >= KEEP_ALIVE_BATCHES {
keep_alive_count = 0;
next_keep_alive_index = 0;
}

let send = match select_2(stop.recv(), sender.wait_sendable()).await {
Select2::R1(_) => break,
Select2::R2(send) => send,
};
// keep steady pace
next_keep_alive_time += KEEP_ALIVE_INTERVAL;
next_keep_alive_timeout.set_deadline(next_keep_alive_time);

while !conns.batch_is_empty() {
let send = match select_2(stop.recv(), sender.wait_sendable()).await {
Select2::R1(_) => break 'main,
Select2::R2(send) => send,
};

// there could be no message if items removed or message construction failed
let (count, msg) =
match conns.next_batch_message(&instance_id, BatchType::KeepAlive) {
Some(ret) => ret,
None => continue,
};

// there could be no message if items removed or message construction failed
if let Some((count, msg)) = conns.next_batch_message(&instance_id, BatchType::KeepAlive)
{
debug!(
"client-worker {}: sending keep alives for {} sessions",
id, count
Expand All @@ -1694,14 +1711,12 @@ impl Worker {
}
}

if conns.batch_is_empty() {
let now = reactor.now();
let now = reactor.now();

if now >= next_keep_alive_time + KEEP_ALIVE_INTERVAL {
// got really behind somehow. just skip ahead
next_keep_alive_time = now + KEEP_ALIVE_INTERVAL;
next_keep_alive_timeout.set_deadline(next_keep_alive_time);
}
if now >= next_keep_alive_time + KEEP_ALIVE_INTERVAL {
// got really behind somehow. just skip ahead
next_keep_alive_time = now + KEEP_ALIVE_INTERVAL;
next_keep_alive_timeout.set_deadline(next_keep_alive_time);
}
}

Expand Down Expand Up @@ -2644,7 +2659,7 @@ pub mod tests {
let ids = ["id-1", "id-2", "id-3"];

let group = batch
.take_group(|ckey| (ids[ckey - 1].as_bytes(), 0))
.take_group(|ckey| Some((ids[ckey - 1].as_bytes(), 0)))
.unwrap();
assert_eq!(group.ids().len(), 2);
assert_eq!(group.ids()[0].id, b"id-1");
Expand All @@ -2657,7 +2672,7 @@ pub mod tests {
assert_eq!(batch.last_group_ckeys(), &[1, 2]);

let group = batch
.take_group(|ckey| (ids[ckey - 1].as_bytes(), 0))
.take_group(|ckey| Some((ids[ckey - 1].as_bytes(), 0)))
.unwrap();
assert_eq!(group.ids().len(), 1);
assert_eq!(group.ids()[0].id, b"id-3");
Expand All @@ -2668,7 +2683,7 @@ pub mod tests {
assert_eq!(batch.last_group_ckeys(), &[3]);

assert!(batch
.take_group(|ckey| { (ids[ckey - 1].as_bytes(), 0) })
.take_group(|ckey| Some((ids[ckey - 1].as_bytes(), 0)))
.is_none());
assert_eq!(batch.last_group_ckeys(), &[3]);

Expand All @@ -2681,7 +2696,7 @@ pub mod tests {
assert_eq!(batch.len(), 1);

let group = batch
.take_group(|ckey| (ids[ckey - 1].as_bytes(), 0))
.take_group(|ckey| Some((ids[ckey - 1].as_bytes(), 0)))
.unwrap();
assert_eq!(group.ids().len(), 1);
assert_eq!(group.ids()[0].id, b"id-2");
Expand All @@ -2694,14 +2709,33 @@ pub mod tests {
assert_eq!(batch.len(), 1);
assert!(!batch.is_empty());
let group = batch
.take_group(|ckey| (ids[ckey - 1].as_bytes(), 0))
.take_group(|ckey| Some((ids[ckey - 1].as_bytes(), 0)))
.unwrap();
assert_eq!(group.ids().len(), 1);
assert_eq!(group.ids()[0].id, b"id-3");
assert_eq!(group.ids()[0].seq, Some(0));
assert_eq!(group.addr(), b"addr-a");
drop(group);
assert_eq!(batch.is_empty(), true);

assert!(batch.add(b"addr-a", 1).is_ok());
assert!(batch.add(b"addr-b", 2).is_ok());
assert!(batch.add(b"addr-b", 3).is_ok());
let group = batch
.take_group(|ckey| {
if ckey < 3 {
None
} else {
Some((ids[ckey - 1].as_bytes(), 0))
}
})
.unwrap();
assert_eq!(group.ids().len(), 1);
assert_eq!(group.ids()[0].id, b"id-3");
assert_eq!(group.ids()[0].seq, Some(0));
assert_eq!(group.addr(), b"addr-b");
drop(group);
assert_eq!(batch.is_empty(), true);
}

#[test]
Expand Down
6 changes: 6 additions & 0 deletions src/connmgr/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1888,6 +1888,9 @@ where
// check_send just finished, so this should succeed
zsess_out.try_send_msg(zreq)?;

// unset to_addr so we don't send keep-alives
zsess_in.shared.set_to_addr(None);

// pause until we get a msg
zsess_in.peek_msg().await?;

Expand Down Expand Up @@ -1917,6 +1920,9 @@ where
// check_send just finished, so this should succeed
zsess_out.try_send_msg(zresp)?;

// unset to_addr so we don't send keep-alives
zsess_in.shared.set_to_addr(None);

// pause until we get a msg
zsess_in.peek_msg().await?;

Expand Down
Loading

0 comments on commit fe718eb

Please sign in to comment.