Skip to content

Commit

Permalink
support to adjust max inflight msgs (#450)
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu authored Aug 4, 2021
1 parent fa0a7c8 commit 15b988a
Show file tree
Hide file tree
Showing 13 changed files with 180 additions and 51 deletions.
2 changes: 1 addition & 1 deletion datadriven/src/line_sparser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn split_directives(line: &str) -> Result<Vec<String>> {

let mut line = line;
while !line.is_empty() {
if let Some(l) = RE.captures(&line) {
if let Some(l) = RE.captures(line) {
// get first captures
let (first, last) = line.split_at(l[0].len());
res.push(first.trim().to_string());
Expand Down
4 changes: 2 additions & 2 deletions examples/five_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl Node {
fn step(&mut self, msg: Message, logger: &slog::Logger) {
if self.raft_group.is_none() {
if is_initial_msg(&msg) {
self.initialize_raft_from_message(&msg, &logger);
self.initialize_raft_from_message(&msg, logger);
} else {
return;
}
Expand Down Expand Up @@ -296,7 +296,7 @@ fn on_ready(
// insert them into the kv engine.
let data = str::from_utf8(&entry.data).unwrap();
let reg = Regex::new("put ([0-9]+) (.+)").unwrap();
if let Some(caps) = reg.captures(&data) {
if let Some(caps) = reg.captures(data) {
kv_pairs.insert(caps[1].parse().unwrap(), caps[2].to_string());
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/single_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ fn on_ready(raft_group: &mut RawNode<MemStorage>, cbs: &mut HashMap<u8, ProposeC

if !ready.entries().is_empty() {
// Append entries to the Raft log.
store.wl().append(&ready.entries()).unwrap();
store.wl().append(ready.entries()).unwrap();
}

if let Some(hs) = ready.hs() {
Expand Down
24 changes: 12 additions & 12 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ fn test_progress_flow_control() {

// While node 2 is in probe state, propose a bunch of entries.
r.mut_prs().get_mut(2).unwrap().become_probe();
let data: String = std::iter::repeat('a').take(1000).collect();
let data: String = "a".repeat(1000);
for _ in 0..10 {
let msg = new_message_with_entries(
1,
Expand Down Expand Up @@ -671,7 +671,7 @@ fn test_vote_from_any_state_for_type(vt: MessageType, l: &Logger) {
StateRole::Leader,
];
for state in all_states {
let mut r = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l);
let mut r = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), l);
r.term = 1;
match state {
StateRole::Follower => {
Expand Down Expand Up @@ -1563,7 +1563,7 @@ fn test_recv_msg_request_vote_for_type(msg_type: MessageType, l: &Logger) {
let store = MemStorage::new_with_conf_state((vec![1], vec![]));
let ents = &[empty_entry(2, 1), empty_entry(2, 2)];
store.wl().append(ents).unwrap();
let mut sm = new_test_raft(1, vec![1], 10, 1, store, &l);
let mut sm = new_test_raft(1, vec![1], 10, 1, store, l);
sm.state = state;
sm.vote = vote_for;

Expand Down Expand Up @@ -1796,11 +1796,11 @@ fn test_candidate_reset_term_msg_append() {
// MsgHeartbeat or MsgAppend from leader, "step" resets the term
// with leader's and reverts back to follower.
fn test_candidate_reset_term(message_type: MessageType, l: &Logger) {
let a = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l);
let b = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage(), &l);
let c = new_test_raft(3, vec![1, 2, 3], 10, 1, new_storage(), &l);
let a = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), l);
let b = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage(), l);
let c = new_test_raft(3, vec![1, 2, 3], 10, 1, new_storage(), l);

let mut nt = Network::new(vec![Some(a), Some(b), Some(c)], &l);
let mut nt = Network::new(vec![Some(a), Some(b), Some(c)], l);

nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

Expand Down Expand Up @@ -4107,15 +4107,15 @@ fn new_prevote_migration_cluster(l: &Logger) -> Network {
// We intentionally do not enable pre_vote for n3, this is done so in order
// to simulate a rolling restart process where it's possible to have a mixed
// version cluster with replicas with pre_vote enabled, and replicas without.
let mut n1 = new_test_raft_with_prevote(1, vec![1, 2, 3], 10, 1, new_storage(), true, &l);
let mut n2 = new_test_raft_with_prevote(2, vec![1, 2, 3], 10, 1, new_storage(), true, &l);
let mut n3 = new_test_raft_with_prevote(3, vec![1, 2, 3], 10, 1, new_storage(), false, &l);
let mut n1 = new_test_raft_with_prevote(1, vec![1, 2, 3], 10, 1, new_storage(), true, l);
let mut n2 = new_test_raft_with_prevote(2, vec![1, 2, 3], 10, 1, new_storage(), true, l);
let mut n3 = new_test_raft_with_prevote(3, vec![1, 2, 3], 10, 1, new_storage(), false, l);

n1.become_follower(1, INVALID_ID);
n2.become_follower(1, INVALID_ID);
n3.become_follower(1, INVALID_ID);

let mut nt = Network::new(vec![Some(n1), Some(n2), Some(n3)], &l);
let mut nt = Network::new(vec![Some(n1), Some(n2), Some(n3)], l);

nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

Expand Down Expand Up @@ -4803,7 +4803,7 @@ fn prepare_request_snapshot() -> (Network, Snapshot) {
.wl()
.apply_snapshot(new_snapshot(11, 11, ids.clone()))
.unwrap();
let mut raft = new_test_raft(id, ids, 5, 1, store, &l);
let mut raft = new_test_raft(id, ids, 5, 1, store, l);
raft.reset(11);
raft
}
Expand Down
4 changes: 2 additions & 2 deletions harness/tests/integration_cases/test_raft_flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ fn test_msg_app_flow_control_with_freeing_resources() {

assert_eq!(r.prs().get(2).unwrap().ins.count(), 0);
assert_eq!(r.prs().get(3).unwrap().ins.count(), 2);
assert_eq!(r.inflight_buffers_size(), 512);
assert_eq!(r.inflight_buffers_size(), 4096);

/*
1: cap=0/start=0/count=0/buffer=[]
Expand All @@ -251,7 +251,7 @@ fn test_msg_app_flow_control_with_freeing_resources() {

assert!(!r.prs().get(2).unwrap().ins.buffer_is_allocated());
assert_eq!(r.prs().get(2).unwrap().ins.count(), 0);
assert_eq!(r.inflight_buffers_size(), 256);
assert_eq!(r.inflight_buffers_size(), 2048);

/*
1: cap=0/start=0/count=0/buffer=[]
Expand Down
2 changes: 1 addition & 1 deletion harness/tests/integration_cases/test_raft_paper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ fn test_leader_update_term_from_message() {
// it immediately reverts to follower state.
// Reference: section 5.1
fn test_update_term_from_message(state: StateRole, l: &Logger) {
let mut r = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l);
let mut r = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), l);
match state {
StateRole::Follower => r.become_follower(1, 2),
StateRole::PreCandidate => r.become_pre_candidate(),
Expand Down
6 changes: 3 additions & 3 deletions harness/tests/integration_cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ fn new_raw_node_with_config(
.apply_snapshot(new_snapshot(1, 1, peers))
.unwrap();
}
RawNode::new(&config, storage, logger).unwrap()
RawNode::new(config, storage, logger).unwrap()
}

/// Ensures that RawNode::step ignore local message.
Expand Down Expand Up @@ -860,7 +860,7 @@ fn test_raw_node_with_async_apply() {
assert!(rd
.ss()
.map_or(false, |ss| ss.leader_id == raw_node.raft.leader_id));
s.wl().append(&rd.entries()).unwrap();
s.wl().append(rd.entries()).unwrap();
let _ = raw_node.advance(rd);

let mut last_index = raw_node.raft.raft_log.last_index();
Expand Down Expand Up @@ -1085,7 +1085,7 @@ fn test_async_ready_leader() {
assert!(rd
.ss()
.map_or(false, |ss| ss.leader_id == raw_node.raft.leader_id));
s.wl().append(&rd.entries()).unwrap();
s.wl().append(rd.entries()).unwrap();
let _ = raw_node.advance(rd);

assert_eq!(raw_node.raft.term, 2);
Expand Down
2 changes: 1 addition & 1 deletion src/confchange/datadriven_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn test_conf_change_data_driven() -> anyhow::Result<()> {
walk("src/confchange/testdata", |path| -> anyhow::Result<()> {
let logger = default_logger();

let mut tr = ProgressTracker::new(10, default_logger());
let mut tr = ProgressTracker::new(10);
let mut idx = 0;

run_test(
Expand Down
17 changes: 10 additions & 7 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,7 @@ impl<T: Storage> Raft<T> {
let learners = &conf_state.learners;

let mut r = Raft {
prs: ProgressTracker::with_capacity(
voters.len(),
learners.len(),
c.max_inflight_msgs,
logger.clone(),
),
prs: ProgressTracker::with_capacity(voters.len(), learners.len(), c.max_inflight_msgs),
msgs: Default::default(),
r: RaftCore {
id: c.id,
Expand Down Expand Up @@ -849,7 +844,7 @@ impl<T: Storage> Raft<T> {
pub fn inflight_buffers_size(&self) -> usize {
let mut total_size = 0;
for (_, pr) in self.prs().iter() {
total_size += pr.ins.cap();
total_size += pr.ins.buffer_capacity() * std::mem::size_of::<u64>();
}
total_size
}
Expand Down Expand Up @@ -2853,4 +2848,12 @@ impl<T: Storage> Raft<T> {
pr.ins.maybe_free_buffer();
}
}

/// To adjust `max_inflight_msgs` for the specified peer.
/// Set to `0` will disable the progress.
pub fn adjust_max_inflight_msgs(&mut self, target: u64, cap: usize) {
if let Some(pr) = self.mut_prs().get_mut(target) {
pr.ins.set_cap(cap);
}
}
}
2 changes: 1 addition & 1 deletion src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl MemStorageCore {
// Remove all entries overwritten by `ents`.
let diff = ents[0].index - self.first_index();
self.entries.drain(diff as usize..);
self.entries.extend_from_slice(&ents);
self.entries.extend_from_slice(ents);
Ok(())
}

Expand Down
18 changes: 4 additions & 14 deletions src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,12 @@ pub use self::inflights::Inflights;
pub use self::progress::Progress;
pub use self::state::ProgressState;

use slog::Logger;

use crate::confchange::{MapChange, MapChangeType};
use crate::eraftpb::ConfState;
use crate::quorum::{AckedIndexer, Index, VoteResult};
use crate::{DefaultHashBuilder, HashMap, HashSet, JointConfig};
use std::fmt::Debug;

use getset::Getters;
use std::fmt::Debug;

/// Config reflects the configuration tracked in a ProgressTracker.
#[derive(Clone, Debug, Default, PartialEq, Getters)]
Expand Down Expand Up @@ -205,22 +202,16 @@ pub struct ProgressTracker {
max_inflight: usize,

group_commit: bool,
pub(crate) logger: Logger,
}

impl ProgressTracker {
/// Creates a new ProgressTracker.
pub fn new(max_inflight: usize, logger: Logger) -> Self {
Self::with_capacity(0, 0, max_inflight, logger)
pub fn new(max_inflight: usize) -> Self {
Self::with_capacity(0, 0, max_inflight)
}

/// Create a progress set with the specified sizes already reserved.
pub fn with_capacity(
voters: usize,
learners: usize,
max_inflight: usize,
logger: Logger,
) -> Self {
pub fn with_capacity(voters: usize, learners: usize, max_inflight: usize) -> Self {
ProgressTracker {
progress: HashMap::with_capacity_and_hasher(
voters + learners,
Expand All @@ -230,7 +221,6 @@ impl ProgressTracker {
votes: HashMap::with_capacity_and_hasher(voters, DefaultHashBuilder::default()),
max_inflight,
group_commit: false,
logger,
}
}

Expand Down
Loading

0 comments on commit 15b988a

Please sign in to comment.