Skip to content

Commit

Permalink
batch: organize peers by whether router mode should be used (#48075)
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges authored Oct 3, 2024
1 parent 70b5918 commit a9a0bb4
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 32 deletions.
95 changes: 65 additions & 30 deletions src/connmgr/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct BatchKey {

pub struct BatchGroup<'a, 'b> {
addr: &'b [u8],
use_router: bool,
ids: arena::ReusableVecHandle<'b, zhttppacket::Id<'a>>,
}

Expand All @@ -38,14 +39,25 @@ impl<'a> BatchGroup<'a, '_> {
self.addr
}

#[allow(dead_code)]
pub fn use_router(&self) -> bool {
self.use_router
}

pub fn ids(&self) -> &[zhttppacket::Id<'a>] {
&self.ids
}
}

struct AddrItem {
addr: ArrayVec<u8, FROM_MAX>,
use_router: bool,
keys: list::List,
}

pub struct Batch {
nodes: Slab<list::Node<usize>>,
addrs: Vec<(ArrayVec<u8, FROM_MAX>, list::List)>,
addrs: Vec<AddrItem>,
addr_index: usize,
group_ids: arena::ReusableVec,
last_group_ckeys: Vec<usize>,
Expand Down Expand Up @@ -80,7 +92,7 @@ impl Batch {
self.addr_index = 0;
}

pub fn add(&mut self, to_addr: &[u8], ckey: usize) -> Result<BatchKey, ()> {
pub fn add(&mut self, to_addr: &[u8], use_router: bool, ckey: usize) -> Result<BatchKey, ()> {
if self.nodes.len() == self.nodes.capacity() {
return Err(());
}
Expand All @@ -94,9 +106,9 @@ impl Batch {

let mut pos = self.addrs.len();

for (i, a) in self.addrs.iter().enumerate() {
if a.0.as_ref() == to_addr {
pos = i;
for (n, ai) in self.addrs.iter().enumerate() {
if ai.addr.as_slice() == to_addr && ai.use_router == use_router {
pos = n;
}
}

Expand All @@ -106,9 +118,13 @@ impl Batch {
}

// connection limits to_addr to FROM_MAX so this is guaranteed to succeed
let a = ArrayVec::try_from(to_addr).unwrap();
let addr = ArrayVec::try_from(to_addr).unwrap();

self.addrs.push((a, list::List::default()));
self.addrs.push(AddrItem {
addr,
use_router,
keys: list::List::default(),
});
} else {
// adding not allowed if take_group() has already moved past the index
if pos < self.addr_index {
Expand All @@ -117,7 +133,7 @@ impl Batch {
}

let nkey = self.nodes.insert(list::Node::new(ckey));
self.addrs[pos].1.push_back(&mut self.nodes, nkey);
self.addrs[pos].keys.push_back(&mut self.nodes, nkey);

Ok(BatchKey {
addr_index: pos,
Expand All @@ -127,7 +143,7 @@ impl Batch {

pub fn remove(&mut self, key: BatchKey) {
self.addrs[key.addr_index]
.1
.keys
.remove(&mut self.nodes, key.nkey);
self.nodes.remove(key.nkey);
}
Expand All @@ -141,7 +157,7 @@ impl Batch {

while ids.is_empty() {
// find the next addr with items
while self.addr_index < addrs.len() && addrs[self.addr_index].1.is_empty() {
while self.addr_index < addrs.len() && addrs[self.addr_index].keys.is_empty() {
self.addr_index += 1;
}

Expand All @@ -151,7 +167,7 @@ impl Batch {
return None;
}

let keys = &mut addrs[self.addr_index].1;
let keys = &mut addrs[self.addr_index].keys;

self.last_group_ckeys.clear();
ids.clear();
Expand All @@ -173,9 +189,13 @@ impl Batch {
}
}

let addr = &addrs[self.addr_index].0;
let ai = &addrs[self.addr_index];

Some(BatchGroup { addr, ids })
Some(BatchGroup {
addr: &ai.addr,
use_router: ai.use_router,
ids,
})
}

pub fn last_group_ckeys(&self) -> &[usize] {
Expand All @@ -189,20 +209,21 @@ mod tests {

#[test]
fn add_take() {
let ids = ["id-1", "id-2", "id-3"];
let mut batch = Batch::new(3);
let ids = ["id-1", "id-2", "id-3", "id-4"];
let mut batch = Batch::new(4);

assert_eq!(batch.capacity(), 3);
assert_eq!(batch.capacity(), 4);
assert_eq!(batch.len(), 0);
assert!(batch.last_group_ckeys().is_empty());

assert!(batch.add(b"addr-a", 1).is_ok());
assert!(batch.add(b"addr-a", 2).is_ok());
assert!(batch.add(b"addr-b", 3).is_ok());
assert_eq!(batch.len(), 3);
assert!(batch.add(b"addr-a", false, 1).is_ok());
assert!(batch.add(b"addr-a", false, 2).is_ok());
assert!(batch.add(b"addr-b", false, 3).is_ok());
assert!(batch.add(b"addr-b", true, 4).is_ok());
assert_eq!(batch.len(), 4);

assert!(batch.add(b"addr-c", 4).is_err());
assert_eq!(batch.len(), 3);
assert!(batch.add(b"addr-c", false, 5).is_err());
assert_eq!(batch.len(), 4);
assert_eq!(batch.is_empty(), false);

let group = batch
Expand All @@ -214,6 +235,7 @@ mod tests {
assert_eq!(group.ids()[1].id, b"id-2");
assert_eq!(group.ids()[1].seq, Some(0));
assert_eq!(group.addr(), b"addr-a");
assert!(!group.use_router());
drop(group);
assert_eq!(batch.is_empty(), false);
assert_eq!(batch.last_group_ckeys(), &[1, 2]);
Expand All @@ -225,23 +247,36 @@ mod tests {
assert_eq!(group.ids()[0].id, b"id-3");
assert_eq!(group.ids()[0].seq, Some(0));
assert_eq!(group.addr(), b"addr-b");
assert!(!group.use_router());
drop(group);
assert_eq!(batch.is_empty(), true);
assert_eq!(batch.is_empty(), false);
assert_eq!(batch.last_group_ckeys(), &[3]);

let group = batch
.take_group(|ckey| Some((ids[ckey - 1].as_bytes(), 0)))
.unwrap();
assert_eq!(group.ids().len(), 1);
assert_eq!(group.ids()[0].id, b"id-4");
assert_eq!(group.ids()[0].seq, Some(0));
assert_eq!(group.addr(), b"addr-b");
assert!(group.use_router());
drop(group);
assert_eq!(batch.is_empty(), true);
assert_eq!(batch.last_group_ckeys(), &[4]);

assert!(batch
.take_group(|ckey| Some((ids[ckey - 1].as_bytes(), 0)))
.is_none());
assert_eq!(batch.last_group_ckeys(), &[3]);
assert_eq!(batch.last_group_ckeys(), &[4]);
}

#[test]
fn add_remove_take() {
let ids = ["id-1", "id-2", "id-3"];
let mut batch = Batch::new(3);

let bkey = batch.add(b"addr-a", 1).unwrap();
assert!(batch.add(b"addr-b", 2).is_ok());
let bkey = batch.add(b"addr-a", false, 1).unwrap();
assert!(batch.add(b"addr-b", false, 2).is_ok());
assert_eq!(batch.len(), 2);
batch.remove(bkey);
assert_eq!(batch.len(), 1);
Expand All @@ -256,7 +291,7 @@ mod tests {
drop(group);
assert_eq!(batch.is_empty(), true);

assert!(batch.add(b"addr-a", 3).is_ok());
assert!(batch.add(b"addr-a", false, 3).is_ok());
assert_eq!(batch.len(), 1);
assert!(!batch.is_empty());

Expand All @@ -276,9 +311,9 @@ mod tests {
let ids = ["id-1", "id-2", "id-3"];
let mut batch = Batch::new(3);

assert!(batch.add(b"addr-a", 1).is_ok());
assert!(batch.add(b"addr-b", 2).is_ok());
assert!(batch.add(b"addr-b", 3).is_ok());
assert!(batch.add(b"addr-a", false, 1).is_ok());
assert!(batch.add(b"addr-b", false, 2).is_ok());
assert!(batch.add(b"addr-b", false, 3).is_ok());

let group = batch
.take_group(|ckey| {
Expand Down
2 changes: 1 addition & 1 deletion src/connmgr/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ impl Connections {
None => return Err(()),
};

let bkey = items.batch.add(addr, ckey)?;
let bkey = items.batch.add(addr, false, ckey)?;

ci.batch_key = Some(bkey);

Expand Down
2 changes: 1 addition & 1 deletion src/connmgr/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ impl Connections {
None => return Err(()),
};

let bkey = items.batch.add(addr, ckey)?;
let bkey = items.batch.add(addr, false, ckey)?;

ci.batch_key = Some(bkey);

Expand Down

0 comments on commit a9a0bb4

Please sign in to comment.