diff --git a/datadriven/src/line_sparser.rs b/datadriven/src/line_sparser.rs index 3f364ada..4dde527d 100644 --- a/datadriven/src/line_sparser.rs +++ b/datadriven/src/line_sparser.rs @@ -80,7 +80,7 @@ fn split_directives(line: &str) -> Result> { let mut line = line; while !line.is_empty() { - if let Some(l) = RE.captures(&line) { + if let Some(l) = RE.captures(line) { // get first captures let (first, last) = line.split_at(l[0].len()); res.push(first.trim().to_string()); diff --git a/examples/five_mem_node/main.rs b/examples/five_mem_node/main.rs index 9f9cf9bb..e825adb0 100644 --- a/examples/five_mem_node/main.rs +++ b/examples/five_mem_node/main.rs @@ -224,7 +224,7 @@ impl Node { fn step(&mut self, msg: Message, logger: &slog::Logger) { if self.raft_group.is_none() { if is_initial_msg(&msg) { - self.initialize_raft_from_message(&msg, &logger); + self.initialize_raft_from_message(&msg, logger); } else { return; } @@ -296,7 +296,7 @@ fn on_ready( // insert them into the kv engine. let data = str::from_utf8(&entry.data).unwrap(); let reg = Regex::new("put ([0-9]+) (.+)").unwrap(); - if let Some(caps) = reg.captures(&data) { + if let Some(caps) = reg.captures(data) { kv_pairs.insert(caps[1].parse().unwrap(), caps[2].to_string()); } } diff --git a/examples/single_mem_node/main.rs b/examples/single_mem_node/main.rs index b287f88d..a2e08113 100644 --- a/examples/single_mem_node/main.rs +++ b/examples/single_mem_node/main.rs @@ -151,7 +151,7 @@ fn on_ready(raft_group: &mut RawNode, cbs: &mut HashMap { @@ -1563,7 +1563,7 @@ fn test_recv_msg_request_vote_for_type(msg_type: MessageType, l: &Logger) { let store = MemStorage::new_with_conf_state((vec![1], vec![])); let ents = &[empty_entry(2, 1), empty_entry(2, 2)]; store.wl().append(ents).unwrap(); - let mut sm = new_test_raft(1, vec![1], 10, 1, store, &l); + let mut sm = new_test_raft(1, vec![1], 10, 1, store, l); sm.state = state; sm.vote = vote_for; @@ -1796,11 +1796,11 @@ fn test_candidate_reset_term_msg_append() { // MsgHeartbeat or MsgAppend from leader, "step" resets the term // with leader's and reverts back to follower. fn test_candidate_reset_term(message_type: MessageType, l: &Logger) { - let a = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l); - let b = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage(), &l); - let c = new_test_raft(3, vec![1, 2, 3], 10, 1, new_storage(), &l); + let a = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), l); + let b = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage(), l); + let c = new_test_raft(3, vec![1, 2, 3], 10, 1, new_storage(), l); - let mut nt = Network::new(vec![Some(a), Some(b), Some(c)], &l); + let mut nt = Network::new(vec![Some(a), Some(b), Some(c)], l); nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); @@ -4107,15 +4107,15 @@ fn new_prevote_migration_cluster(l: &Logger) -> Network { // We intentionally do not enable pre_vote for n3, this is done so in order // to simulate a rolling restart process where it's possible to have a mixed // version cluster with replicas with pre_vote enabled, and replicas without. - let mut n1 = new_test_raft_with_prevote(1, vec![1, 2, 3], 10, 1, new_storage(), true, &l); - let mut n2 = new_test_raft_with_prevote(2, vec![1, 2, 3], 10, 1, new_storage(), true, &l); - let mut n3 = new_test_raft_with_prevote(3, vec![1, 2, 3], 10, 1, new_storage(), false, &l); + let mut n1 = new_test_raft_with_prevote(1, vec![1, 2, 3], 10, 1, new_storage(), true, l); + let mut n2 = new_test_raft_with_prevote(2, vec![1, 2, 3], 10, 1, new_storage(), true, l); + let mut n3 = new_test_raft_with_prevote(3, vec![1, 2, 3], 10, 1, new_storage(), false, l); n1.become_follower(1, INVALID_ID); n2.become_follower(1, INVALID_ID); n3.become_follower(1, INVALID_ID); - let mut nt = Network::new(vec![Some(n1), Some(n2), Some(n3)], &l); + let mut nt = Network::new(vec![Some(n1), Some(n2), Some(n3)], l); nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); @@ -4803,7 +4803,7 @@ fn prepare_request_snapshot() -> (Network, Snapshot) { .wl() .apply_snapshot(new_snapshot(11, 11, ids.clone())) .unwrap(); - let mut raft = new_test_raft(id, ids, 5, 1, store, &l); + let mut raft = new_test_raft(id, ids, 5, 1, store, l); raft.reset(11); raft } diff --git a/harness/tests/integration_cases/test_raft_flow_control.rs b/harness/tests/integration_cases/test_raft_flow_control.rs index 73355b1b..bc555839 100644 --- a/harness/tests/integration_cases/test_raft_flow_control.rs +++ b/harness/tests/integration_cases/test_raft_flow_control.rs @@ -239,7 +239,7 @@ fn test_msg_app_flow_control_with_freeing_resources() { assert_eq!(r.prs().get(2).unwrap().ins.count(), 0); assert_eq!(r.prs().get(3).unwrap().ins.count(), 2); - assert_eq!(r.inflight_buffers_size(), 512); + assert_eq!(r.inflight_buffers_size(), 4096); /* 1: cap=0/start=0/count=0/buffer=[] @@ -251,7 +251,7 @@ fn test_msg_app_flow_control_with_freeing_resources() { assert!(!r.prs().get(2).unwrap().ins.buffer_is_allocated()); assert_eq!(r.prs().get(2).unwrap().ins.count(), 0); - assert_eq!(r.inflight_buffers_size(), 256); + assert_eq!(r.inflight_buffers_size(), 2048); /* 1: cap=0/start=0/count=0/buffer=[] diff --git a/harness/tests/integration_cases/test_raft_paper.rs b/harness/tests/integration_cases/test_raft_paper.rs index 3e435839..22a29b34 100644 --- a/harness/tests/integration_cases/test_raft_paper.rs +++ b/harness/tests/integration_cases/test_raft_paper.rs @@ -77,7 +77,7 @@ fn test_leader_update_term_from_message() { // it immediately reverts to follower state. // Reference: section 5.1 fn test_update_term_from_message(state: StateRole, l: &Logger) { - let mut r = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l); + let mut r = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), l); match state { StateRole::Follower => r.become_follower(1, 2), StateRole::PreCandidate => r.become_pre_candidate(), diff --git a/harness/tests/integration_cases/test_raw_node.rs b/harness/tests/integration_cases/test_raw_node.rs index d9248572..e93dcab4 100644 --- a/harness/tests/integration_cases/test_raw_node.rs +++ b/harness/tests/integration_cases/test_raw_node.rs @@ -84,7 +84,7 @@ fn new_raw_node_with_config( .apply_snapshot(new_snapshot(1, 1, peers)) .unwrap(); } - RawNode::new(&config, storage, logger).unwrap() + RawNode::new(config, storage, logger).unwrap() } /// Ensures that RawNode::step ignore local message. @@ -860,7 +860,7 @@ fn test_raw_node_with_async_apply() { assert!(rd .ss() .map_or(false, |ss| ss.leader_id == raw_node.raft.leader_id)); - s.wl().append(&rd.entries()).unwrap(); + s.wl().append(rd.entries()).unwrap(); let _ = raw_node.advance(rd); let mut last_index = raw_node.raft.raft_log.last_index(); @@ -1085,7 +1085,7 @@ fn test_async_ready_leader() { assert!(rd .ss() .map_or(false, |ss| ss.leader_id == raw_node.raft.leader_id)); - s.wl().append(&rd.entries()).unwrap(); + s.wl().append(rd.entries()).unwrap(); let _ = raw_node.advance(rd); assert_eq!(raw_node.raft.term, 2); diff --git a/src/confchange/datadriven_test.rs b/src/confchange/datadriven_test.rs index 33d73f2d..169889eb 100644 --- a/src/confchange/datadriven_test.rs +++ b/src/confchange/datadriven_test.rs @@ -8,7 +8,7 @@ fn test_conf_change_data_driven() -> anyhow::Result<()> { walk("src/confchange/testdata", |path| -> anyhow::Result<()> { let logger = default_logger(); - let mut tr = ProgressTracker::new(10, default_logger()); + let mut tr = ProgressTracker::new(10); let mut idx = 0; run_test( diff --git a/src/raft.rs b/src/raft.rs index 1dfce98d..22fc694c 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -324,12 +324,7 @@ impl Raft { let learners = &conf_state.learners; let mut r = Raft { - prs: ProgressTracker::with_capacity( - voters.len(), - learners.len(), - c.max_inflight_msgs, - logger.clone(), - ), + prs: ProgressTracker::with_capacity(voters.len(), learners.len(), c.max_inflight_msgs), msgs: Default::default(), r: RaftCore { id: c.id, @@ -849,7 +844,7 @@ impl Raft { pub fn inflight_buffers_size(&self) -> usize { let mut total_size = 0; for (_, pr) in self.prs().iter() { - total_size += pr.ins.cap(); + total_size += pr.ins.buffer_capacity() * std::mem::size_of::(); } total_size } @@ -2853,4 +2848,12 @@ impl Raft { pr.ins.maybe_free_buffer(); } } + + /// To adjust `max_inflight_msgs` for the specified peer. + /// Set to `0` will disable the progress. + pub fn adjust_max_inflight_msgs(&mut self, target: u64, cap: usize) { + if let Some(pr) = self.mut_prs().get_mut(target) { + pr.ins.set_cap(cap); + } + } } diff --git a/src/storage.rs b/src/storage.rs index ec6dfd0d..36c260db 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -295,7 +295,7 @@ impl MemStorageCore { // Remove all entries overwritten by `ents`. let diff = ents[0].index - self.first_index(); self.entries.drain(diff as usize..); - self.entries.extend_from_slice(&ents); + self.entries.extend_from_slice(ents); Ok(()) } diff --git a/src/tracker.rs b/src/tracker.rs index 2d79c03e..5424e7bf 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -22,15 +22,12 @@ pub use self::inflights::Inflights; pub use self::progress::Progress; pub use self::state::ProgressState; -use slog::Logger; - use crate::confchange::{MapChange, MapChangeType}; use crate::eraftpb::ConfState; use crate::quorum::{AckedIndexer, Index, VoteResult}; use crate::{DefaultHashBuilder, HashMap, HashSet, JointConfig}; -use std::fmt::Debug; - use getset::Getters; +use std::fmt::Debug; /// Config reflects the configuration tracked in a ProgressTracker. #[derive(Clone, Debug, Default, PartialEq, Getters)] @@ -205,22 +202,16 @@ pub struct ProgressTracker { max_inflight: usize, group_commit: bool, - pub(crate) logger: Logger, } impl ProgressTracker { /// Creates a new ProgressTracker. - pub fn new(max_inflight: usize, logger: Logger) -> Self { - Self::with_capacity(0, 0, max_inflight, logger) + pub fn new(max_inflight: usize) -> Self { + Self::with_capacity(0, 0, max_inflight) } /// Create a progress set with the specified sizes already reserved. - pub fn with_capacity( - voters: usize, - learners: usize, - max_inflight: usize, - logger: Logger, - ) -> Self { + pub fn with_capacity(voters: usize, learners: usize, max_inflight: usize) -> Self { ProgressTracker { progress: HashMap::with_capacity_and_hasher( voters + learners, @@ -230,7 +221,6 @@ impl ProgressTracker { votes: HashMap::with_capacity_and_hasher(voters, DefaultHashBuilder::default()), max_inflight, group_commit: false, - logger, } } diff --git a/src/tracker/inflights.rs b/src/tracker/inflights.rs index bafea066..a9626d37 100644 --- a/src/tracker/inflights.rs +++ b/src/tracker/inflights.rs @@ -14,6 +14,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; + /// A buffer of inflight messages. #[derive(Debug, PartialEq, Clone)] pub struct Inflights { @@ -27,6 +29,9 @@ pub struct Inflights { // capacity cap: usize, + + // To support dynamically change inflight size. + incoming_cap: Option, } impl Inflights { @@ -37,13 +42,50 @@ impl Inflights { start: 0, count: 0, cap, + incoming_cap: None, + } + } + + /// Adjust inflight buffer capacity. Set it to `0` will disable the progress. + // Calling it between `self.full()` and `self.add()` can cause a panic. + pub fn set_cap(&mut self, incoming_cap: usize) { + match self.cap.cmp(&incoming_cap) { + Ordering::Equal => self.incoming_cap = None, + Ordering::Less => { + if self.start + self.count <= self.cap { + if self.buffer.capacity() > 0 { + self.buffer.reserve(incoming_cap - self.buffer.len()); + } + } else { + debug_assert_eq!(self.cap, self.buffer.len()); + let mut buffer = Vec::with_capacity(incoming_cap); + buffer.extend_from_slice(&self.buffer[self.start..]); + buffer.extend_from_slice(&self.buffer[0..self.count - (self.cap - self.start)]); + self.buffer = buffer; + self.start = 0; + } + self.cap = incoming_cap; + self.incoming_cap = None; + } + Ordering::Greater => { + if self.count == 0 { + self.cap = incoming_cap; + self.incoming_cap = None; + self.start = 0; + if self.buffer.capacity() > 0 { + self.buffer = Vec::with_capacity(incoming_cap); + } + } else { + self.incoming_cap = Some(incoming_cap); + } + } } } /// Returns true if the inflights is full. #[inline] pub fn full(&self) -> bool { - self.count == self.cap + self.count == self.cap || self.incoming_cap.map_or(false, |cap| self.count >= cap) } /// Adds an inflight into inflights @@ -55,6 +97,7 @@ impl Inflights { if self.buffer.capacity() == 0 { debug_assert_eq!(self.count, 0); debug_assert_eq!(self.start, 0); + debug_assert!(self.incoming_cap.is_none()); self.buffer = Vec::with_capacity(self.cap); } @@ -98,6 +141,14 @@ impl Inflights { // free i inflights and set new start index self.count -= i; self.start = idx; + + if self.count == 0 { + if let Some(incoming_cap) = self.incoming_cap.take() { + self.start = 0; + self.cap = incoming_cap; + self.buffer = Vec::with_capacity(self.cap); + } + } } /// Frees the first buffer entry. @@ -113,6 +164,7 @@ impl Inflights { self.count = 0; self.start = 0; self.buffer = vec![]; + self.cap = self.incoming_cap.take().unwrap_or(self.cap); } // Number of inflight messages. It's for tests. @@ -122,10 +174,10 @@ impl Inflights { self.count } - // Capacity of inflight buffer. + // Capacity of the internal buffer. #[doc(hidden)] #[inline] - pub fn cap(&self) -> usize { + pub fn buffer_capacity(&self) -> usize { self.buffer.capacity() } @@ -133,12 +185,12 @@ impl Inflights { #[doc(hidden)] #[inline] pub fn buffer_is_allocated(&self) -> bool { - self.cap() > 0 + self.buffer_capacity() > 0 } /// Free unused memory #[inline] - pub(crate) fn maybe_free_buffer(&mut self) { + pub fn maybe_free_buffer(&mut self) { if self.count == 0 { self.start = 0; self.buffer = vec![]; @@ -163,6 +215,7 @@ mod tests { count: 5, buffer: vec![0, 1, 2, 3, 4], cap: 10, + incoming_cap: None, }; assert_eq!(inflight, wantin); @@ -176,6 +229,7 @@ mod tests { count: 10, buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9], cap: 10, + incoming_cap: None, }; assert_eq!(inflight, wantin2); @@ -193,6 +247,7 @@ mod tests { count: 5, buffer: vec![0, 0, 0, 0, 0, 0, 1, 2, 3, 4], cap: 10, + incoming_cap: None, }; assert_eq!(inflight2, wantin21); @@ -206,6 +261,7 @@ mod tests { count: 10, buffer: vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4], cap: 10, + incoming_cap: None, }; assert_eq!(inflight2, wantin22); @@ -225,6 +281,7 @@ mod tests { count: 5, buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9], cap: 10, + incoming_cap: None, }; assert_eq!(inflight, wantin); @@ -236,6 +293,7 @@ mod tests { count: 1, buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9], cap: 10, + incoming_cap: None, }; assert_eq!(inflight, wantin2); @@ -251,6 +309,7 @@ mod tests { count: 2, buffer: vec![10, 11, 12, 13, 14, 5, 6, 7, 8, 9], cap: 10, + incoming_cap: None, }; assert_eq!(inflight, wantin3); @@ -262,6 +321,7 @@ mod tests { count: 0, buffer: vec![10, 11, 12, 13, 14, 5, 6, 7, 8, 9], cap: 10, + incoming_cap: None, }; assert_eq!(inflight, wantin4); @@ -281,8 +341,84 @@ mod tests { count: 9, buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9], cap: 10, + incoming_cap: None, }; assert_eq!(inflight, wantin); } + + #[test] + fn test_inflights_set_cap() { + // Prepare 3 `Inflights` with 16 items, but start at 16, 112 and 120. + let mut inflights = Vec::with_capacity(3); + for &start in &[16, 112, 120] { + let mut inflight = Inflights::new(128); + (0..start).for_each(|i| inflight.add(i)); + inflight.free_to(start - 1); + (0..16).for_each(|i| inflight.add(i)); + assert_eq!(inflight.count(), 16); + assert_eq!(inflight.start, start as usize); + inflights.push(inflight); + } + + // Adjust cap to a larger value. + for (i, inflight) in inflights.iter_mut().enumerate() { + inflight.set_cap(1024); + assert_eq!(inflight.cap, 1024); + assert_eq!(inflight.incoming_cap, None); + assert_eq!(inflight.buffer_capacity(), 1024); + if i < 2 { + // The internal buffer is extended directly. + assert_ne!(inflight.start, 0); + } else { + // The internal buffer is re-allocated instead of extended. + assert_eq!(inflight.start, 0); + } + } + + // Prepare 3 `Inflights` with given `start`, `count` and `buffer_cap`. + let mut inflights = Vec::with_capacity(3); + for &(start, count, buffer_cap) in &[(1, 0, 0), (1, 0, 128), (1, 8, 128)] { + let mut inflight = Inflights::new(128); + inflight.start = start; + inflight.buffer = vec![0; buffer_cap]; + (0..count).for_each(|i| inflight.add(i)); + inflights.push(inflight); + } + + // Adjust cap to a less value. + for (i, inflight) in inflights.iter_mut().enumerate() { + inflight.set_cap(64); + if i == 0 || i == 1 { + assert_eq!(inflight.cap, 64); + assert_eq!(inflight.incoming_cap, None); + assert_eq!(inflight.start, 0); + if i == 0 { + assert_eq!(inflight.buffer.capacity(), 0) + } else { + assert_eq!(inflight.buffer.capacity(), 64) + } + } else { + assert_eq!(inflight.cap, 128); + assert_eq!(inflight.incoming_cap, Some(64)); + assert_eq!(inflight.start, 1); + assert_eq!(inflight.buffer.capacity(), 128) + } + } + + // `incoming_cap` can be cleared if the buffer is freed totally. + let mut inflight = inflights[2].clone(); + inflight.free_to(7); + assert_eq!(inflight.cap, 64); + assert_eq!(inflight.incoming_cap, None); + assert_eq!(inflight.start, 0); + + // `incoming_cap` can be cleared when `cap` is enlarged. + for &new_cap in &[128, 1024] { + let mut inflight = inflights[2].clone(); + inflight.set_cap(new_cap); + assert_eq!(inflight.cap, new_cap); + assert_eq!(inflight.incoming_cap, None); + } + } } diff --git a/src/util.rs b/src/util.rs index a3ae7206..9ad603d2 100644 --- a/src/util.rs +++ b/src/util.rs @@ -152,7 +152,7 @@ impl<'a> Union<'a> { /// The time complexity is O(n). pub fn len(&self) -> usize { // Usually, second is empty. - self.first.len() + self.second.len() - self.second.intersection(&self.first).count() + self.first.len() + self.second.len() - self.second.intersection(self.first).count() } }