diff --git a/consensus/tendermint/messages.go b/consensus/tendermint/messages.go new file mode 100644 index 0000000000..a87278502b --- /dev/null +++ b/consensus/tendermint/messages.go @@ -0,0 +1,107 @@ +package tendermint + +// Todo: Signature over the messages needs to be handled somewhere. There are 2 options: +// 1. Add the signature to each message and extend the Validator Set interface to include VerifyMessageSignature +// method. +// 2. The P2P layer signs the message before gossiping to other validators and verifies the signature before passing +// the message to the consensus engine. +// The benefit of P2P layer handling the verification of the signature is the that the consensus layer can assume +// the message is from a validator in the validator set. However, this means that the P2P layer would need to be aware +// of the validator set and would need access to the blockchain which may not be a good idea. + +type Message[V Hashable[H], H Hash, A Addr] interface { + Proposal[V, H, A] | Prevote[H, A] | Precommit[H, A] +} + +type Proposal[V Hashable[H], H Hash, A Addr] struct { + H height + R round + ValidRound int + Value *V + + Sender A +} + +type ( + Prevote[H Hash, A Addr] Vote[H, A] + Precommit[H Hash, A Addr] Vote[H, A] +) + +type Vote[H Hash, A Addr] struct { + H height + R round + ID *H + + Sender A +} + +// messages keep tracks of all the proposals, prevotes, precommits by creating a map structure as follows: +// height->round->address->[]Message + +// Todo: would the following representation of message be better: +// +// height -> round -> address -> ID -> Message +// How would we keep track of nil votes? In golan map key cannot be nil. +// It is not easy to calculate a zero value when dealing with generics. +type messages[V Hashable[H], H Hash, A Addr] struct { + proposals map[height]map[round]map[A][]Proposal[V, H, A] + prevotes map[height]map[round]map[A][]Prevote[H, A] + precommits map[height]map[round]map[A][]Precommit[H, A] +} + +func newMessages[V Hashable[H], H Hash, A Addr]() messages[V, H, A] { + return messages[V, H, A]{ + proposals: make(map[height]map[round]map[A][]Proposal[V, H, A]), + prevotes: make(map[height]map[round]map[A][]Prevote[H, A]), + precommits: make(map[height]map[round]map[A][]Precommit[H, A]), + } +} + +func addMessages[T any, A Addr](storage map[height]map[round]map[A][]T, msg T, a A, h height, r round) { + if _, ok := storage[h]; !ok { + storage[h] = make(map[round]map[A][]T) + } + + if _, ok := storage[h][r]; !ok { + storage[h][r] = make(map[A][]T) + } + + sendersMessages, ok := storage[h][r][a] + if !ok { + sendersMessages = []T{} + } + + storage[h][r][a] = append(sendersMessages, msg) +} + +// Todo: ensure duplicated messages are ignored. +func (m *messages[V, H, A]) addProposal(p Proposal[V, H, A]) { + addMessages(m.proposals, p, p.Sender, p.H, p.R) +} + +func (m *messages[V, H, A]) addPrevote(p Prevote[H, A]) { + addMessages(m.prevotes, p, p.Sender, p.H, p.R) +} + +func (m *messages[V, H, A]) addPrecommit(p Precommit[H, A]) { + addMessages(m.precommits, p, p.Sender, p.H, p.R) +} + +func (m *messages[V, H, A]) allMessages(h height, r round) (map[A][]Proposal[V, H, A], map[A][]Prevote[H, A], + map[A][]Precommit[H, A], +) { + // Todo: Should they be copied? + return m.proposals[h][r], m.prevotes[h][r], m.precommits[h][r] +} + +func (m *messages[V, H, A]) deleteHeightMessages(h height) { + delete(m.proposals, h) + delete(m.prevotes, h) + delete(m.precommits, h) +} + +func (m *messages[V, H, A]) deleteRoundMessages(h height, r round) { + delete(m.proposals[h], r) + delete(m.prevotes[h], r) + delete(m.precommits[h], r) +} diff --git a/consensus/tendermint/precommit.go b/consensus/tendermint/precommit.go new file mode 100644 index 0000000000..65f9bb81df --- /dev/null +++ b/consensus/tendermint/precommit.go @@ -0,0 +1,89 @@ +package tendermint + +import ( + "maps" + "slices" +) + +func (t *Tendermint[V, H, A]) handlePrecommit(p Precommit[H, A]) { + if p.H < t.state.h { + return + } + + if !handleFutureHeightMessage( + t, + p, + func(p Precommit[H, A]) height { return p.H }, + func(p Precommit[H, A]) round { return p.R }, + t.futureMessages.addPrecommit, + ) { + return + } + + if !handleFutureRoundMessage(t, p, func(p Precommit[H, A]) round { return p.R }, t.futureMessages.addPrecommit) { + return + } + + t.messages.addPrecommit(p) + + proposalsForHR, _, precommitsForHR := t.messages.allMessages(p.H, p.R) + + if t.line49WhenPrecommitIsReceived(p, proposalsForHR, precommitsForHR) { + return + } + + t.line47(p, precommitsForHR) +} + +/* +Check the upon condition on line 49: + + 49: upon {PROPOSAL, h_p, r, v, *} from proposer(h_p, r) AND 2f + 1 {PRECOMMIT, h_p, r, id(v)} while decision_p[h_p] = nil do + 50: if valid(v) then + 51: decisionp[hp] = v + 52: h_p ← h_p + 1 + 53: reset lockedRound_p, lockedValue_p, validRound_p and validValue_p to initial values and empty message log + 54: StartRound(0) + +Fetching the relevant proposal implies the sender of the proposal was the proposer for that +height and round. Also, since only the proposals with valid value are added to the message set, the +validity of the proposal can be skipped. + +There is no need to check decision_p[h_p] = nil since it is implied that decision are made +sequentially, i.e. x, x+1, x+2... . +*/ +func (t *Tendermint[V, H, A]) line49WhenPrecommitIsReceived(p Precommit[H, A], proposalsForHR map[A][]Proposal[V, H, + A], precommitsForHR map[A][]Precommit[H, A], +) bool { + if p.ID != nil { + proposal := t.checkForMatchingProposalGivenPrecommit(p, proposalsForHR) + + precommits, vals := checkForQuorumPrecommit[H, A](precommitsForHR, *p.ID) + + if proposal != nil && t.validatorSetVotingPower(vals) >= q(t.validators.TotalVotingPower(p.H)) { + t.blockchain.Commit(t.state.h, *proposal.Value, precommits) + + t.messages.deleteHeightMessages(t.state.h) + t.state.h++ + t.startRound(0) + + return true + } + } + return false +} + +/* +Check the upon condition on line 47: + + 47: upon 2f + 1 {PRECOMMIT, h_p, round_p, ∗} for the first time do + 48: schedule OnTimeoutPrecommit(h_p , round_p) to be executed after timeoutPrecommit(round_p) +*/ +func (t *Tendermint[V, H, A]) line47(p Precommit[H, A], precommitsForHR map[A][]Precommit[H, A]) { + vals := slices.Collect(maps.Keys(precommitsForHR)) + if p.R == t.state.r && !t.state.timeoutPrecommitScheduled && + t.validatorSetVotingPower(vals) >= q(t.validators.TotalVotingPower(p.H)) { + t.scheduleTimeout(t.timeoutPrecommit(p.R), precommit, p.H, p.R) + t.state.timeoutPrecommitScheduled = true + } +} diff --git a/consensus/tendermint/prevote.go b/consensus/tendermint/prevote.go new file mode 100644 index 0000000000..e594ed564e --- /dev/null +++ b/consensus/tendermint/prevote.go @@ -0,0 +1,187 @@ +package tendermint + +import ( + "maps" + "slices" +) + +func (t *Tendermint[V, H, A]) handlePrevote(p Prevote[H, A]) { + if p.H < t.state.h { + return + } + + if !handleFutureHeightMessage( + t, + p, + func(p Prevote[H, A]) height { return p.H }, + func(p Prevote[H, A]) round { return p.R }, + t.futureMessages.addPrevote, + ) { + return + } + + if !handleFutureRoundMessage(t, p, func(p Prevote[H, A]) round { return p.R }, t.futureMessages.addPrevote) { + return + } + + t.messages.addPrevote(p) + + proposalsForHR, prevotesForHR, _ := t.messages.allMessages(p.H, p.R) + + t.line28WhenPrevoteIsReceived(p, prevotesForHR) + + if p.R == t.state.r { + t.line34(p, prevotesForHR) + t.line44(p, prevotesForHR) + + t.line36WhenPrevoteIsReceived(p, proposalsForHR, prevotesForHR) + } +} + +/* +Check the upon condition on line 28: + + 28: upon {PROPOSAL, h_p, round_p, v, vr} from proposer(h_p, round_p) AND 2f + 1 {PREVOTE,h_p, vr, id(v)} while + step_p = propose ∧ (vr ≥ 0 ∧ vr < round_p) do + 29: if valid(v) ∧ (lockedRound_p ≤ vr ∨ lockedValue_p = v) then + 30: broadcast {PREVOTE, hp, round_p, id(v)} + 31: else + 32: broadcast {PREVOTE, hp, round_p, nil} + 33: step_p ← prevote + +Fetching the relevant proposal implies the sender of the proposal was the proposer for that +height and round. Also, since only the proposals with valid value are added to the message set, the +validity of the proposal can be skipped. + +Calculating quorum of prevotes is more resource intensive than checking other condition on line 28, +therefore, it is checked in a subsequent if statement. +*/ +func (t *Tendermint[V, H, A]) line28WhenPrevoteIsReceived(p Prevote[H, A], prevotesForHR map[A][]Prevote[H, A]) { + // vr >= 0 doesn't need to be checked since vr is a uint + if vr := p.R; p.ID != nil && t.state.s == propose && vr < t.state.r { + cr := t.state.r + + proposalsForHCR, _, _ := t.messages.allMessages(p.H, cr) + + proposal := t.checkForMatchingProposalGivenPrevote(p, proposalsForHCR) + vals := checkForQuorumPrevotesGivenPrevote(p, prevotesForHR) + + if proposal != nil && t.validatorSetVotingPower(vals) >= q(t.validators.TotalVotingPower(p.H)) { + vote := Prevote[H, A]{ + H: t.state.h, + R: t.state.r, + ID: nil, + Sender: t.nodeAddr, + } + + if t.state.lockedRound >= int(vr) || (*t.state.lockedValue).Hash() == *p.ID { + vote.ID = p.ID + } + + t.messages.addPrevote(vote) + t.broadcasters.PrevoteBroadcaster.Broadcast(vote) + t.state.s = prevote + } + } +} + +/* +Check the upon condition on line 34: + + 34: upon 2f + 1 {PREVOTE, h_p, round_p, ∗} while step_p = prevote for the first time do + 35: schedule OnTimeoutPrevote(h_p, round_p) to be executed after timeoutPrevote(round_p) +*/ +func (t *Tendermint[V, H, A]) line34(p Prevote[H, A], prevotesForHR map[A][]Prevote[H, A]) { + vals := slices.Collect(maps.Keys(prevotesForHR)) + if !t.state.timeoutPrevoteScheduled && t.state.s == prevote && + t.validatorSetVotingPower(vals) >= q(t.validators.TotalVotingPower(p.H)) { + t.scheduleTimeout(t.timeoutPrevote(p.R), prevote, p.H, p.R) + t.state.timeoutPrevoteScheduled = true + } +} + +/* +Check the upon condition on line 44: + + 44: upon 2f + 1 {PREVOTE, h_p, round_p, nil} while step_p = prevote do + 45: broadcast {PRECOMMIT, hp, roundp, nil} + 46: step_p ← precommit + +Line 36 and 44 for a round are mutually exclusive. +*/ +func (t *Tendermint[V, H, A]) line44(p Prevote[H, A], prevotesForHR map[A][]Prevote[H, A]) { + var vals []A + for addr, valPrevotes := range prevotesForHR { + for _, v := range valPrevotes { + if v.ID == nil { + vals = append(vals, addr) + } + } + } + + if t.state.s == prevote && t.validatorSetVotingPower(vals) >= q(t.validators.TotalVotingPower(p.H)) { + vote := Precommit[H, A]{ + H: t.state.h, + R: t.state.r, + ID: nil, + Sender: t.nodeAddr, + } + + t.messages.addPrecommit(vote) + t.broadcasters.PrecommitBroadcaster.Broadcast(vote) + t.state.s = precommit + } +} + +/* +Check upon condition on line 36: + + 36: upon {PROPOSAL, h_p, round_p, v, ∗} from proposer(h_p, round_p) AND 2f + 1 {PREVOTE, h_p, round_p, id(v)} while + valid(v) ∧ step_p ≥ prevote for the first time do + 37: if step_p = prevote then + 38: lockedValue_p ← v + 39: lockedRound_p ← round_p + 40: broadcast {PRECOMMIT, h_p, round_p, id(v))} + 41: step_p ← precommit + 42: validValue_p ← v + 43: validRound_p ← round_p + +Fetching the relevant proposal implies the sender of the proposal was the proposer for that +height and round. Also, since only the proposals with valid value are added to the message set, the +validity of the proposal can be skipped. + +Calculating quorum of prevotes is more resource intensive than checking other condition on line 36, +therefore, it is checked in a subsequent if statement. +*/ +func (t *Tendermint[V, H, A]) line36WhenPrevoteIsReceived(p Prevote[H, A], proposalsForHR map[A][]Proposal[V, H, A], + prevotesForHR map[A][]Prevote[H, A], +) { + if !t.state.lockedValueAndOrValidValueSet && t.state.s >= prevote { + proposal := t.checkForMatchingProposalGivenPrevote(p, proposalsForHR) + vals := checkForQuorumPrevotesGivenPrevote(p, prevotesForHR) + + if proposal != nil && t.validatorSetVotingPower(vals) >= q(t.validators.TotalVotingPower(p.H)) { + cr := t.state.r + + if t.state.s == prevote { + t.state.lockedValue = proposal.Value + t.state.lockedRound = int(cr) + + vote := Precommit[H, A]{ + H: t.state.h, + R: t.state.r, + ID: p.ID, + Sender: t.nodeAddr, + } + + t.messages.addPrecommit(vote) + t.broadcasters.PrecommitBroadcaster.Broadcast(vote) + t.state.s = precommit + } + + t.state.validValue = proposal.Value + t.state.validRound = int(cr) + t.state.lockedValueAndOrValidValueSet = true + } + } +} diff --git a/consensus/tendermint/propose.go b/consensus/tendermint/propose.go new file mode 100644 index 0000000000..f87e48742e --- /dev/null +++ b/consensus/tendermint/propose.go @@ -0,0 +1,214 @@ +package tendermint + +func (t *Tendermint[V, H, A]) handleProposal(p Proposal[V, H, A]) { + if p.H < t.state.h { + return + } + + if !handleFutureHeightMessage( + t, + p, + func(p Proposal[V, H, A]) height { return p.H }, + func(p Proposal[V, H, A]) round { return p.R }, + t.futureMessages.addProposal, + ) { + return + } + + if !handleFutureRoundMessage(t, p, func(p Proposal[V, H, A]) round { return p.R }, t.futureMessages.addProposal) { + return + } + + // The code below shouldn't panic because it is expected Proposal is well-formed. However, there need to be a way to + // distinguish between nil and zero value. This is expected to be handled by the p2p layer. + vID := (*p.Value).Hash() + validProposal := t.application.Valid(*p.Value) + proposalFromProposer := p.Sender == t.validators.Proposer(p.H, p.R) + vr := p.ValidRound + + if validProposal { + // Add the proposal to the message set even if the sender is not the proposer, + // this is because of slahsing purposes + t.messages.addProposal(p) + } + + _, prevotesForHR, precommitsForHR := t.messages.allMessages(p.H, p.R) + + if t.line49WhenProposalIsReceived(p, precommitsForHR, vID, validProposal, proposalFromProposer) { + return + } + + if p.R < t.state.r { + // Except line 49 all other upon condition which refer to the proposals expect to be acted upon + // when the current round is equal to the proposal's round. + return + } + + t.line22(vr, proposalFromProposer, validProposal, vID) + t.line28WhenProposalIsReceived(p, vr, proposalFromProposer, vID, validProposal) + t.line36WhenProposalIsReceived(p, validProposal, proposalFromProposer, prevotesForHR, vID) +} + +/* +Check the upon condition on line 49: + + 49: upon {PROPOSAL, h_p, r, v, *} from proposer(h_p, r) AND 2f + 1 {PRECOMMIT, h_p, r, id(v)} while decision_p[h_p] = nil do + 50: if valid(v) then + 51: decisionp[hp] = v + 52: h_p ← h_p + 1 + 53: reset lockedRound_p, lockedValue_p, validRound_p and validValue_p to initial values and empty message log + 54: StartRound(0) + + There is no need to check decision_p[h_p] = nil since it is implied that decision are made + sequentially, i.e. x, x+1, x+2... . The validity of the proposal value can be checked in the same if + statement since there is no else statement. +*/ +func (t *Tendermint[V, H, A]) line49WhenProposalIsReceived(p Proposal[V, H, A], precommitsForHR map[A][]Precommit[H, + A], vID H, validProposal bool, proposalFromProposer bool, +) bool { + precommits, vals := checkForQuorumPrecommit[H, A](precommitsForHR, vID) + + if validProposal && proposalFromProposer && + t.validatorSetVotingPower(vals) >= q(t.validators.TotalVotingPower(p.H)) { + // After committing the block, how the new height and round is started needs to be coordinated + // with the synchronisation process. + t.blockchain.Commit(t.state.h, *p.Value, precommits) + + t.messages.deleteHeightMessages(t.state.h) + t.state.h++ + t.startRound(0) + + return true + } + return false +} + +/* +Check the upon condition on line 22: + + 22: upon {PROPOSAL, h_p, round_p, v, nil} from proposer(h_p, round_p) while step_p = propose do + 23: if valid(v) ∧ (lockedRound_p = −1 ∨ lockedValue_p = v) then + 24: broadcast {PREVOTE, h_p, round_p, id(v)} + 25: else + 26: broadcast {PREVOTE, h_p, round_p, nil} + 27: step_p ← prevote + +The implementation uses nil as -1 to avoid using int type. + +Since the value's id is expected to be unique the id can be used to compare the values. +*/ +func (t *Tendermint[V, H, A]) line22(vr int, proposalFromProposer, validProposal bool, vID H) { + if vr == -1 && proposalFromProposer && t.state.s == propose { + vote := Prevote[H, A]{ + H: t.state.h, + R: t.state.r, + ID: nil, + Sender: t.nodeAddr, + } + + if validProposal && (t.state.lockedRound == -1 || (*t.state.lockedValue).Hash() == vID) { + vote.ID = &vID + } + + t.messages.addPrevote(vote) + t.broadcasters.PrevoteBroadcaster.Broadcast(vote) + t.state.s = prevote + } +} + +/* +Check the upon condition on line 28: + + 28: upon {PROPOSAL, h_p, round_p, v, vr} from proposer(h_p, round_p) AND 2f + 1 {PREVOTE,h_p, vr, id(v)} while + step_p = propose ∧ (vr ≥ 0 ∧ vr < round_p) do + 29: if valid(v) ∧ (lockedRound_p ≤ vr ∨ lockedValue_p = v) then + 30: broadcast {PREVOTE, hp, round_p, id(v)} + 31: else + 32: broadcast {PREVOTE, hp, round_p, nil} + 33: step_p ← prevote + +Ideally, the condition on line 28 would be checked in a single if statement, however, +this cannot be done because valid round needs to be non-nil before the prevotes are fetched. +*/ +func (t *Tendermint[V, H, A]) line28WhenProposalIsReceived(p Proposal[V, H, A], vr int, proposalFromProposer bool, + vID H, validProposal bool, +) { + if vr != -1 && proposalFromProposer && t.state.s == propose && vr >= 0 && vr < int(t.state.r) { + _, prevotesForHVr, _ := t.messages.allMessages(p.H, round(vr)) + + vals := checkQuorumPrevotesGivenProposalVID(prevotesForHVr, vID) + + if t.validatorSetVotingPower(vals) >= q(t.validators.TotalVotingPower(p.H)) { + vote := Prevote[H, A]{ + H: t.state.h, + R: t.state.r, + ID: nil, + Sender: t.nodeAddr, + } + + if validProposal && (t.state.lockedRound <= vr || (*t.state.lockedValue).Hash() == vID) { + vote.ID = &vID + } + + t.messages.addPrevote(vote) + t.broadcasters.PrevoteBroadcaster.Broadcast(vote) + t.state.s = prevote + } + } +} + +/* +Check upon condition on line 36: + + 36: upon {PROPOSAL, h_p, round_p, v, ∗} from proposer(h_p, round_p) AND 2f + 1 {PREVOTE, h_p, round_p, id(v)} while + valid(v) ∧ step_p ≥ prevote for the first time do + 37: if step_p = prevote then + 38: lockedValue_p ← v + 39: lockedRound_p ← round_p + 40: broadcast {PRECOMMIT, h_p, round_p, id(v))} + 41: step_p ← precommit + 42: validValue_p ← v + 43: validRound_p ← round_p + +The condition on line 36 can should be checked in a single if statement, however, +checking for quroum is more resource intensive than other conditions, therefore, they are checked +first. +*/ +func (t *Tendermint[V, H, A]) line36WhenProposalIsReceived(p Proposal[V, H, A], validProposal, + proposalFromProposer bool, prevotesForHR map[A][]Prevote[H, A], vID H, +) { + if validProposal && proposalFromProposer && !t.state.lockedValueAndOrValidValueSet && t.state.s >= prevote { + var vals []A + for addr, valPrevotes := range prevotesForHR { + for _, v := range valPrevotes { + if *v.ID == vID { + vals = append(vals, addr) + } + } + } + + if t.validatorSetVotingPower(vals) >= q(t.validators.TotalVotingPower(p.H)) { + cr := t.state.r + + if t.state.s == prevote { + t.state.lockedValue = p.Value + t.state.lockedRound = int(cr) + + vote := Precommit[H, A]{ + H: t.state.h, + R: t.state.r, + ID: &vID, + Sender: t.nodeAddr, + } + + t.messages.addPrecommit(vote) + t.broadcasters.PrecommitBroadcaster.Broadcast(vote) + t.state.s = precommit + } + + t.state.validValue = p.Value + t.state.validRound = int(cr) + t.state.lockedValueAndOrValidValueSet = true + } + } +} diff --git a/consensus/tendermint/propose_test.go b/consensus/tendermint/propose_test.go new file mode 100644 index 0000000000..fc9235a035 --- /dev/null +++ b/consensus/tendermint/propose_test.go @@ -0,0 +1,492 @@ +package tendermint + +import ( + "slices" + "testing" + "time" + + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/utils" + "github.com/stretchr/testify/assert" +) + +func TestPropose(t *testing.T) { + nodeAddr := new(felt.Felt).SetBytes([]byte("my node address")) + val2, val3, val4 := new(felt.Felt).SetUint64(2), new(felt.Felt).SetUint64(3), new(felt.Felt).SetUint64(4) + tm := func(r round) time.Duration { return time.Second } + + t.Run("Line 55 (Proposal): Start round r' when f+1 future round messages are received from round r'", func(t *testing.T) { + listeners, broadcasters := testListenersAndBroadcasters() + app, chain, vals := newApp(), newChain(), newVals() + + vals.addValidator(*val2) + vals.addValidator(*val3) + vals.addValidator(*val4) + vals.addValidator(*nodeAddr) + + algo := New[value, felt.Felt, felt.Felt](*nodeAddr, app, chain, vals, listeners, broadcasters, tm, tm, tm) + + expectedHeight := height(0) + rPrime, rPrimeVal := round(4), value(10) + val2Proposal := Proposal[value, felt.Felt, felt.Felt]{ + H: expectedHeight, + R: rPrime, + ValidRound: -1, + Value: &rPrimeVal, + Sender: *val2, + } + + val3Prevote := Prevote[felt.Felt, felt.Felt]{ + H: expectedHeight, + R: rPrime, + ID: utils.HeapPtr(rPrimeVal.Hash()), + Sender: *val3, + } + + algo.futureMessages.addPrevote(val3Prevote) + proposalListener := listeners.ProposalListener.(*senderAndReceiver[Proposal[value, felt.Felt, felt.Felt], + value, felt.Felt, felt.Felt]) + proposalListener.send(val2Proposal) + + algo.Start() + time.Sleep(5 * time.Millisecond) + algo.Stop() + + assert.Equal(t, 1, len(algo.messages.proposals[expectedHeight][rPrime][*val2])) + assert.Equal(t, val2Proposal, algo.messages.proposals[expectedHeight][rPrime][*val2][0]) + + assert.Equal(t, 1, len(algo.messages.prevotes[expectedHeight][rPrime][*val3])) + assert.Equal(t, val3Prevote, algo.messages.prevotes[expectedHeight][rPrime][*val3][0]) + + // The step is not propose because the proposal which is received in round r' leads to consensus + // engine broadcasting prevote to the proposal which changes the step from propose to prevote. + assert.Equal(t, prevote, algo.state.s) + assert.Equal(t, expectedHeight, algo.state.h) + assert.Equal(t, rPrime, algo.state.r) + }) + + t.Run("Line 55 (Prevote): Start round r' when f+1 future round messages are received from round r'", func(t *testing.T) { + listeners, broadcasters := testListenersAndBroadcasters() + app, chain, vals := newApp(), newChain(), newVals() + + vals.addValidator(*val2) + vals.addValidator(*val3) + vals.addValidator(*val4) + vals.addValidator(*nodeAddr) + + tm := func(r round) time.Duration { return 2 * time.Second } + + algo := New[value, felt.Felt, felt.Felt](*nodeAddr, app, chain, vals, listeners, broadcasters, tm, tm, tm) + + expectedHeight := height(0) + rPrime, rPrimeVal := round(4), value(10) + val2Prevote := Prevote[felt.Felt, felt.Felt]{ + H: expectedHeight, + R: rPrime, + ID: utils.HeapPtr(rPrimeVal.Hash()), + Sender: *val2, + } + + val3Prevote := Prevote[felt.Felt, felt.Felt]{ + H: expectedHeight, + R: rPrime, + ID: utils.HeapPtr(rPrimeVal.Hash()), + Sender: *val3, + } + + algo.futureMessages.addPrevote(val2Prevote) + prevoteListener := listeners.PrevoteListener.(*senderAndReceiver[Prevote[felt.Felt, felt.Felt], value, + felt.Felt, felt.Felt]) + prevoteListener.send(val3Prevote) + + algo.Start() + time.Sleep(5 * time.Millisecond) + algo.Stop() + + assert.Equal(t, 1, len(algo.messages.prevotes[expectedHeight][rPrime][*val2])) + assert.Equal(t, val2Prevote, algo.messages.prevotes[expectedHeight][rPrime][*val2][0]) + + assert.Equal(t, 1, len(algo.messages.prevotes[expectedHeight][rPrime][*val3])) + assert.Equal(t, val3Prevote, algo.messages.prevotes[expectedHeight][rPrime][*val3][0]) + + // The step here remains propose because a proposal is yet to be received to allow the node to send the + // prevote for it. + assert.Equal(t, propose, algo.state.s) + assert.Equal(t, expectedHeight, algo.state.h) + assert.Equal(t, rPrime, algo.state.r) + }) + + t.Run("Line 55 (Precommit): Start round r' when f+1 future round messages are received from round r'", func(t *testing.T) { + listeners, broadcasters := testListenersAndBroadcasters() + app, chain, vals := newApp(), newChain(), newVals() + + vals.addValidator(*val2) + vals.addValidator(*val3) + vals.addValidator(*val4) + vals.addValidator(*nodeAddr) + + algo := New[value, felt.Felt, felt.Felt](*nodeAddr, app, chain, vals, listeners, broadcasters, tm, tm, tm) + + expectedHeight := height(0) + rPrime := round(4) + round4Value := value(10) + val2Precommit := Precommit[felt.Felt, felt.Felt]{ + H: expectedHeight, + R: rPrime, + ID: utils.HeapPtr(round4Value.Hash()), + Sender: *val2, + } + + val3Prevote := Prevote[felt.Felt, felt.Felt]{ + H: expectedHeight, + R: rPrime, + ID: utils.HeapPtr(round4Value.Hash()), + Sender: *val3, + } + + algo.futureMessages.addPrevote(val3Prevote) + prevoteListener := listeners.PrecommitListener.(*senderAndReceiver[Precommit[felt.Felt, felt.Felt], value, + felt.Felt, felt.Felt]) + prevoteListener.send(val2Precommit) + + algo.Start() + time.Sleep(5 * time.Millisecond) + algo.Stop() + + assert.Equal(t, 1, len(algo.messages.precommits[expectedHeight][rPrime][*val2])) + assert.Equal(t, val2Precommit, algo.messages.precommits[expectedHeight][rPrime][*val2][0]) + + assert.Equal(t, 1, len(algo.messages.prevotes[expectedHeight][rPrime][*val3])) + assert.Equal(t, val3Prevote, algo.messages.prevotes[expectedHeight][rPrime][*val3][0]) + + // The step here remains propose because a proposal is yet to be received to allow the node to send the + // prevote for it. + assert.Equal(t, propose, algo.state.s) + assert.Equal(t, expectedHeight, algo.state.h) + assert.Equal(t, rPrime, algo.state.r) + }) + + t.Run("Line 47: schedule timeout precommit", func(t *testing.T) { + listeners, broadcasters := testListenersAndBroadcasters() + nodeAddr := new(felt.Felt).SetBytes([]byte("my node address")) + app, chain, vals := newApp(), newChain(), newVals() + + vals.addValidator(*val2) + vals.addValidator(*val3) + vals.addValidator(*val4) + vals.addValidator(*nodeAddr) + + algo := New[value, felt.Felt, felt.Felt](*nodeAddr, app, chain, vals, listeners, broadcasters, tm, tm, tm) + + val2Precommit := Precommit[felt.Felt, felt.Felt]{ + H: 0, + R: 0, + ID: utils.HeapPtr(value(10).Hash()), + Sender: *val2, + } + val3Precommit := Precommit[felt.Felt, felt.Felt]{ + H: 0, + R: 0, + ID: nil, + Sender: *val3, + } + val4Precommit := Precommit[felt.Felt, felt.Felt]{ + H: 0, + R: 0, + ID: nil, + Sender: *val4, + } + + algo.messages.addPrecommit(val2Precommit) + algo.messages.addPrecommit(val3Precommit) + + precommitListner := listeners.PrecommitListener.(*senderAndReceiver[Precommit[felt.Felt, felt.Felt], value, + felt.Felt, felt.Felt]) + precommitListner.send(val4Precommit) + + algo.Start() + time.Sleep(5 * time.Millisecond) + algo.Stop() + + assert.Equal(t, 2, len(algo.scheduledTms)) + assert.Contains(t, algo.scheduledTms, timeout{s: precommit, h: 0, r: 0}) + + assert.True(t, algo.state.timeoutPrecommitScheduled) + assert.Equal(t, propose, algo.state.s) + assert.Equal(t, height(0), algo.state.h) + assert.Equal(t, round(0), algo.state.r) + }) + + t.Run("Line 47: don't schedule timeout precommit multiple times", func(t *testing.T) { + listeners, broadcasters := testListenersAndBroadcasters() + nodeAddr := new(felt.Felt).SetBytes([]byte("my node address")) + app, chain, vals := newApp(), newChain(), newVals() + + vals.addValidator(*val2) + vals.addValidator(*val3) + vals.addValidator(*val4) + vals.addValidator(*nodeAddr) + + algo := New[value, felt.Felt, felt.Felt](*nodeAddr, app, chain, vals, listeners, broadcasters, tm, tm, tm) + + nodePrecommit := Precommit[felt.Felt, felt.Felt]{ + H: 0, + R: 0, + ID: nil, + Sender: *nodeAddr, + } + val2Precommit := Precommit[felt.Felt, felt.Felt]{ + H: 0, + R: 0, + ID: utils.HeapPtr(value(10).Hash()), + Sender: *val2, + } + val3Precommit := Precommit[felt.Felt, felt.Felt]{ + H: 0, + R: 0, + ID: nil, + Sender: *val3, + } + val4Precommit := Precommit[felt.Felt, felt.Felt]{ + H: 0, + R: 0, + ID: nil, + Sender: *val4, + } + + algo.messages.addPrecommit(val2Precommit) + algo.messages.addPrecommit(val3Precommit) + + precommitListner := listeners.PrecommitListener.(*senderAndReceiver[Precommit[felt.Felt, felt.Felt], value, + felt.Felt, felt.Felt]) + precommitListner.send(val4Precommit) + precommitListner.send(nodePrecommit) + + algo.Start() + time.Sleep(5 * time.Millisecond) + algo.Stop() + + // The reason there are 2 timeouts is because the first timeout is the proposeTimeout which is immediately + // scheduled when nodes move to the next round, and it is not the proposer. + // If the precommitTimeout was scheduled more than once, then the len of scheduledTms would be more than 2. + assert.Equal(t, 2, len(algo.scheduledTms)) + assert.Contains(t, algo.scheduledTms, timeout{s: precommit, h: 0, r: 0}) + + assert.True(t, algo.state.timeoutPrecommitScheduled) + assert.Equal(t, propose, algo.state.s) + assert.Equal(t, height(0), algo.state.h) + assert.Equal(t, round(0), algo.state.r) + }) + + t.Run("OnTimeoutPrecommit: move to next round", func(t *testing.T) { + listeners, broadcasters := testListenersAndBroadcasters() + nodeAddr := new(felt.Felt).SetBytes([]byte("my node address")) + app, chain, vals := newApp(), newChain(), newVals() + tmPrecommit := func(r round) time.Duration { return time.Nanosecond } + + vals.addValidator(*val2) + vals.addValidator(*val3) + vals.addValidator(*val4) + vals.addValidator(*nodeAddr) + + algo := New[value, felt.Felt, felt.Felt](*nodeAddr, app, chain, vals, listeners, broadcasters, tm, tm, tmPrecommit) + + val2Precommit := Precommit[felt.Felt, felt.Felt]{ + H: 0, + R: 0, + ID: utils.HeapPtr(value(10).Hash()), + Sender: *val2, + } + val3Precommit := Precommit[felt.Felt, felt.Felt]{ + H: 0, + R: 0, + ID: nil, + Sender: *val3, + } + val4Precommit := Precommit[felt.Felt, felt.Felt]{ + H: 0, + R: 0, + ID: nil, + Sender: *val4, + } + + algo.messages.addPrecommit(val2Precommit) + algo.messages.addPrecommit(val3Precommit) + + precommitListner := listeners.PrecommitListener.(*senderAndReceiver[Precommit[felt.Felt, felt.Felt], value, + felt.Felt, felt.Felt]) + precommitListner.send(val4Precommit) + + algo.Start() + time.Sleep(5 * time.Millisecond) + algo.Stop() + + // The first timeout here is the nodes proposeTimeout from round 0, and since the precommit timout expired + // before the proposeTimeout it is still in the slice. It will only be deleted after its expiry. + // The second timeout here is the proposeTimeout for round 1, which is what we are interested in. + assert.Equal(t, 2, len(algo.scheduledTms)) + assert.Contains(t, algo.scheduledTms, timeout{s: propose, h: 0, r: 1}) + + assert.False(t, algo.state.timeoutPrecommitScheduled) + assert.Equal(t, propose, algo.state.s) + assert.Equal(t, height(0), algo.state.h) + assert.Equal(t, round(1), algo.state.r) + }) + + t.Run("Line 49 (Proposal): commit the value", func(t *testing.T) { + listeners, broadcasters := testListenersAndBroadcasters() + nodeAddr := new(felt.Felt).SetBytes([]byte("my node address")) + app, chain, vals := newApp(), newChain(), newVals() + + vals.addValidator(*val2) + vals.addValidator(*val3) + vals.addValidator(*val4) + vals.addValidator(*nodeAddr) + + algo := New[value, felt.Felt, felt.Felt](*nodeAddr, app, chain, vals, listeners, broadcasters, tm, tm, tm) + + h, r := height(0), round(0) + + val := app.Value() + vID := val.Hash() + + val2Precommit := Precommit[felt.Felt, felt.Felt]{ + H: h, + R: r, + ID: &vID, + Sender: *val2, + } + val3Precommit := Precommit[felt.Felt, felt.Felt]{ + H: h, + R: r, + ID: &vID, + Sender: *val3, + } + val4Precommit := Precommit[felt.Felt, felt.Felt]{ + H: h, + R: r, + ID: &vID, + Sender: *val4, + } + + // The node has received all the precommits but has received the corresponding proposal + algo.messages.addPrecommit(val2Precommit) + algo.messages.addPrecommit(val3Precommit) + algo.messages.addPrecommit(val4Precommit) + + // since val2 is the proposer of round 0, the proposal arrives after the precommits + val2Proposal := Proposal[value, felt.Felt, felt.Felt]{ + H: h, + R: r, + ValidRound: -1, + Value: &val, + Sender: *val2, + } + + proposalListener := listeners.ProposalListener.(*senderAndReceiver[Proposal[value, felt.Felt, felt.Felt], + value, felt.Felt, felt.Felt]) + proposalListener.send(val2Proposal) + + algo.Start() + time.Sleep(5 * time.Millisecond) + algo.Stop() + + assert.Equal(t, 2, len(algo.scheduledTms)) + assert.Contains(t, algo.scheduledTms, timeout{s: propose, h: 1, r: 0}) + + assert.Equal(t, propose, algo.state.s) + assert.Equal(t, height(1), algo.state.h) + assert.Equal(t, round(0), algo.state.r) + + precommits := []Precommit[felt.Felt, felt.Felt]{val2Precommit, val3Precommit, val4Precommit} + assert.Equal(t, chain.decision[0], val) + for _, p := range chain.decisionCertificates[0] { + assert.True(t, slices.Contains(precommits, p)) + } + + assert.Equal(t, 0, len(algo.messages.proposals)) + assert.Equal(t, 0, len(algo.messages.prevotes)) + assert.Equal(t, 0, len(algo.messages.precommits)) + }) + + t.Run("Line 49 (Precommit): commit the value", func(t *testing.T) { + listeners, broadcasters := testListenersAndBroadcasters() + nodeAddr := new(felt.Felt).SetBytes([]byte("my node address")) + app, chain, vals := newApp(), newChain(), newVals() + + vals.addValidator(*val2) + vals.addValidator(*val3) + vals.addValidator(*val4) + vals.addValidator(*nodeAddr) + + algo := New[value, felt.Felt, felt.Felt](*nodeAddr, app, chain, vals, listeners, broadcasters, tm, tm, tm) + + h, r := height(0), round(0) + + val := app.Value() + vID := val.Hash() + + val2Precommit := Precommit[felt.Felt, felt.Felt]{ + H: h, + R: r, + ID: &vID, + Sender: *val2, + } + val2Proposal := Proposal[value, felt.Felt, felt.Felt]{ + H: h, + R: r, + ValidRound: -1, + Value: &val, + Sender: *val2, + } + val3Precommit := Precommit[felt.Felt, felt.Felt]{ + H: h, + R: r, + ID: &vID, + Sender: *val3, + } + + // The node has received all the precommits but has received the corresponding proposal + algo.messages.addPrecommit(val2Precommit) + algo.messages.addProposal(val2Proposal) + algo.messages.addPrecommit(val3Precommit) + + val4Precommit := Precommit[felt.Felt, felt.Felt]{ + H: h, + R: r, + ID: &vID, + Sender: *val4, + } + + precommitListner := listeners.PrecommitListener.(*senderAndReceiver[Precommit[felt.Felt, felt.Felt], + value, felt.Felt, felt.Felt]) + precommitListner.send(val4Precommit) + + algo.Start() + time.Sleep(5 * time.Millisecond) + algo.Stop() + + assert.Equal(t, 2, len(algo.scheduledTms)) + assert.Contains(t, algo.scheduledTms, timeout{s: propose, h: 1, r: 0}) + + assert.Equal(t, propose, algo.state.s) + assert.Equal(t, height(1), algo.state.h) + assert.Equal(t, round(0), algo.state.r) + + precommits := []Precommit[felt.Felt, felt.Felt]{val2Precommit, val3Precommit, val4Precommit} + assert.Equal(t, chain.decision[0], val) + for _, p := range chain.decisionCertificates[0] { + assert.True(t, slices.Contains(precommits, p)) + } + + assert.Equal(t, 0, len(algo.messages.proposals)) + assert.Equal(t, 0, len(algo.messages.prevotes)) + assert.Equal(t, 0, len(algo.messages.precommits)) + }) + + t.Run("Line 22: receive a new proposal before timeout expiry", func(t *testing.T) { + }) + + t.Run("Line 28: receive an old proposal before timeout expiry", func(t *testing.T) { + }) +} diff --git a/consensus/tendermint/tendermint.go b/consensus/tendermint/tendermint.go new file mode 100644 index 0000000000..e2d28aa87c --- /dev/null +++ b/consensus/tendermint/tendermint.go @@ -0,0 +1,549 @@ +package tendermint + +import ( + "maps" + "slices" + "sync" + "time" + + "github.com/NethermindEth/juno/core/felt" +) + +type ( + step uint8 + height uint + round uint +) + +const ( + propose step = iota + prevote + precommit +) + +func (s step) String() string { + switch s { + case propose: + return "propose" + case prevote: + return "prevote" + case precommit: + return "precommit" + default: + return "unknown" + } +} + +const ( + maxFutureHeight = height(5) + maxFutureRound = round(5) +) + +type timeoutFn func(r round) time.Duration + +type Addr interface { + // Ethereum Addresses are 20 bytes + ~[20]byte | felt.Felt +} + +type Hash interface { + ~[32]byte | felt.Felt +} + +// Hashable's Hash() is used as ID() +type Hashable[H Hash] interface { + Hash() H +} + +type Application[V Hashable[H], H Hash] interface { + // Value returns the value to the Tendermint consensus algorith which can be proposed to other validators. + Value() V + + // Valid returns true if the provided value is valid according to the application context. + Valid(V) bool +} + +type Blockchain[V Hashable[H], H Hash, A Addr] interface { + // Height return the current blockchain height + Height() height + + // Commit is called by Tendermint when a block has been decided on and can be committed to the DB. + Commit(height, V, []Precommit[H, A]) +} + +type Validators[A Addr] interface { + // TotalVotingPower represents N which is required to calculate the thresholds. + TotalVotingPower(height) uint + + // ValidatorVotingPower returns the voting power of the a single validator. This is also required to implement + // various thresholds. The assumption is that a single validator cannot have voting power more than f. + ValidatorVotingPower(A) uint + + // Proposer returns the proposer of the current round and height. + Proposer(height, round) A +} + +type Slasher[M Message[V, H, A], V Hashable[H], H Hash, A Addr] interface { + // Equivocation informs the slasher that a validator has sent conflicting messages. Thus it can decide whether to + // slash the validator and by how much. + Equivocation(msgs ...M) +} + +type Listener[M Message[V, H, A], V Hashable[H], H Hash, A Addr] interface { + // Listen would return consensus messages to Tendermint which are set by the validator set. + Listen() <-chan M +} + +type Broadcaster[M Message[V, H, A], V Hashable[H], H Hash, A Addr] interface { + // Broadcast will broadcast the message to the whole validator set. The function should not be blocking. + Broadcast(M) + + // SendMsg would send a message to a specific validator. This would be required for helping send resquest and + // response message to help a specifc validator to catch up. + SendMsg(A, M) +} + +type Listeners[V Hashable[H], H Hash, A Addr] struct { + ProposalListener Listener[Proposal[V, H, A], V, H, A] + PrevoteListener Listener[Prevote[H, A], V, H, A] + PrecommitListener Listener[Precommit[H, A], V, H, A] +} + +type Broadcasters[V Hashable[H], H Hash, A Addr] struct { + ProposalBroadcaster Broadcaster[Proposal[V, H, A], V, H, A] + PrevoteBroadcaster Broadcaster[Prevote[H, A], V, H, A] + PrecommitBroadcaster Broadcaster[Precommit[H, A], V, H, A] +} + +type Tendermint[V Hashable[H], H Hash, A Addr] struct { + nodeAddr A + + state state[V, H] // Todo: Does state need to be protected? + + messages messages[V, H, A] + futureMessages messages[V, H, A] + + futureMessagesMu *sync.Mutex + + timeoutPropose timeoutFn + timeoutPrevote timeoutFn + timeoutPrecommit timeoutFn + + application Application[V, H] + blockchain Blockchain[V, H, A] + validators Validators[A] + + listeners Listeners[V, H, A] + broadcasters Broadcasters[V, H, A] + + scheduledTms map[timeout]*time.Timer + timeoutsCh chan timeout + + // Future round messages are sent to the loop through the following channels + proposalsCh chan Proposal[V, H, A] + prevotesCh chan Prevote[H, A] + precommitsCh chan Precommit[H, A] + + wg sync.WaitGroup + quit chan struct{} +} + +type state[V Hashable[H], H Hash] struct { + h height + r round + s step + + lockedValue *V + lockedRound int + validValue *V + validRound int + + // The following are round level variable therefore when a round changes they must be reset. + timeoutPrevoteScheduled bool // line34 for the first time condition + timeoutPrecommitScheduled bool // line47 for the first time condition + lockedValueAndOrValidValueSet bool // line36 for the first time condition +} + +func New[V Hashable[H], H Hash, A Addr](nodeAddr A, app Application[V, H], chain Blockchain[V, H, A], vals Validators[A], + listeners Listeners[V, H, A], broadcasters Broadcasters[V, H, A], tmPropose, tmPrevote, tmPrecommit timeoutFn, +) *Tendermint[V, H, A] { + return &Tendermint[V, H, A]{ + nodeAddr: nodeAddr, + state: state[V, H]{ + h: chain.Height(), + lockedRound: -1, + validRound: -1, + }, + messages: newMessages[V, H, A](), + futureMessages: newMessages[V, H, A](), + futureMessagesMu: &sync.Mutex{}, + timeoutPropose: tmPropose, + timeoutPrevote: tmPrevote, + timeoutPrecommit: tmPrecommit, + application: app, + blockchain: chain, + validators: vals, + listeners: listeners, + broadcasters: broadcasters, + scheduledTms: make(map[timeout]*time.Timer), + timeoutsCh: make(chan timeout), + proposalsCh: make(chan Proposal[V, H, A]), + prevotesCh: make(chan Prevote[H, A]), + precommitsCh: make(chan Precommit[H, A]), + quit: make(chan struct{}), + } +} + +func (t *Tendermint[V, H, A]) Start() { + t.wg.Add(1) + go func() { + defer t.wg.Done() + + t.startRound(0) + + // Todo: check message signature everytime a message is received. + // For the time being it can be assumed the signature is correct. + + for { + select { + case <-t.quit: + return + case tm := <-t.timeoutsCh: + // Handling of timeouts is priorities over messages + switch tm.s { + case propose: + t.OnTimeoutPropose(tm.h, tm.r) + case prevote: + t.OnTimeoutPrevote(tm.h, tm.r) + case precommit: + t.OnTimeoutPrecommit(tm.h, tm.r) + } + delete(t.scheduledTms, tm) + case p := <-t.proposalsCh: + t.handleProposal(p) + case p := <-t.listeners.ProposalListener.Listen(): + t.handleProposal(p) + case p := <-t.prevotesCh: + t.handlePrevote(p) + case p := <-t.listeners.PrevoteListener.Listen(): + t.handlePrevote(p) + case p := <-t.precommitsCh: + t.handlePrecommit(p) + case p := <-t.listeners.PrecommitListener.Listen(): + t.handlePrecommit(p) + } + } + }() +} + +func (t *Tendermint[V, H, A]) Stop() { + close(t.quit) + t.wg.Wait() + for _, tm := range t.scheduledTms { + tm.Stop() + } +} + +func (t *Tendermint[V, H, A]) startRound(r round) { + if r != 0 && r <= t.state.r { + return + } + + t.state.r = r + t.state.s = propose + + t.state.timeoutPrevoteScheduled = false + t.state.lockedValueAndOrValidValueSet = false + t.state.timeoutPrecommitScheduled = false + + if p := t.validators.Proposer(t.state.h, r); p == t.nodeAddr { + var proposalValue *V + if t.state.validValue != nil { + proposalValue = t.state.validValue + } else { + v := t.application.Value() + proposalValue = &v + } + proposalMessage := Proposal[V, H, A]{ + H: t.state.h, + R: r, + ValidRound: t.state.validRound, + Value: proposalValue, + Sender: t.nodeAddr, + } + + t.messages.addProposal(proposalMessage) + t.broadcasters.ProposalBroadcaster.Broadcast(proposalMessage) + } else { + t.scheduleTimeout(t.timeoutPropose(r), propose, t.state.h, t.state.r) + } + + go t.processFutureMessages(t.state.h, t.state.r) +} + +//nolint:gocyclo +func (t *Tendermint[V, H, A]) processFutureMessages(h height, r round) { + t.futureMessagesMu.Lock() + defer t.futureMessagesMu.Unlock() + + proposals, prevotes, precommits := t.futureMessages.allMessages(h, r) + if len(proposals) > 0 { + for _, addrProposals := range proposals { + for _, proposal := range addrProposals { + select { + case <-t.quit: + return + case t.proposalsCh <- proposal: + } + } + } + } + + if len(prevotes) > 0 { + for _, addrPrevotes := range prevotes { + for _, vote := range addrPrevotes { + select { + case <-t.quit: + return + case t.prevotesCh <- vote: + } + } + } + } + + if len(precommits) > 0 { + for _, addrPrecommits := range precommits { + for _, vote := range addrPrecommits { + select { + case <-t.quit: + return + case t.precommitsCh <- vote: + } + } + } + } + + t.futureMessages.deleteRoundMessages(h, r) +} + +type timeout struct { + s step + h height + r round +} + +func (t *Tendermint[V, H, A]) scheduleTimeout(duration time.Duration, s step, h height, r round) { + tm := timeout{s: s, h: h, r: r} + t.scheduledTms[tm] = time.AfterFunc(duration, func() { + select { + case <-t.quit: + case t.timeoutsCh <- tm: + } + }) +} + +func (t *Tendermint[_, H, A]) OnTimeoutPropose(h height, r round) { + if t.state.h == h && t.state.r == r && t.state.s == propose { + vote := Prevote[H, A]{ + H: t.state.h, + R: t.state.r, + ID: nil, + Sender: t.nodeAddr, + } + t.messages.addPrevote(vote) + t.broadcasters.PrevoteBroadcaster.Broadcast(vote) + t.state.s = prevote + } +} + +func (t *Tendermint[_, H, A]) OnTimeoutPrevote(h height, r round) { + if t.state.h == h && t.state.r == r && t.state.s == prevote { + vote := Precommit[H, A]{ + H: t.state.h, + R: t.state.r, + ID: nil, + Sender: t.nodeAddr, + } + t.messages.addPrecommit(vote) + t.broadcasters.PrecommitBroadcaster.Broadcast(vote) + t.state.s = precommit + } +} + +func (t *Tendermint[_, _, _]) OnTimeoutPrecommit(h height, r round) { + if t.state.h == h && t.state.r == r { + t.startRound(r + 1) + } +} + +// line55 assumes the caller has acquired a mutex for accessing future messages. +/* + 55: upon f + 1 {∗, h_p, round, ∗, ∗} with round > round_p do + 56: StartRound(round) +*/ +func (t *Tendermint[V, H, A]) line55(futureR round) { + t.futureMessagesMu.Lock() + + vals := make(map[A]struct{}) + proposals, prevotes, precommits := t.futureMessages.allMessages(t.state.h, futureR) + + // If a validator has sent proposl, prevote and precommit from a future round then it will only be counted once. + for addr := range proposals { + vals[addr] = struct{}{} + } + + for addr := range prevotes { + vals[addr] = struct{}{} + } + + for addr := range precommits { + vals[addr] = struct{}{} + } + + t.futureMessagesMu.Unlock() + + if t.validatorSetVotingPower(slices.Collect(maps.Keys(vals))) > f(t.validators.TotalVotingPower(t.state.h)) { + t.startRound(futureR) + } +} + +func (t *Tendermint[V, H, A]) validatorSetVotingPower(vals []A) uint { + var totalVotingPower uint + for _, v := range vals { + totalVotingPower += t.validators.ValidatorVotingPower(v) + } + return totalVotingPower +} + +// Todo: add separate unit tests to check f and q thresholds. +func f(totalVotingPower uint) uint { + // note: integer division automatically floors the result as it return the quotient. + return (totalVotingPower - 1) / 3 +} + +func q(totalVotingPower uint) uint { + // Unfortunately there is no ceiling function for integers in go. + d := totalVotingPower * 2 + q := d / 3 + r := d % 3 + if r > 0 { + q++ + } + return q +} + +func handleFutureRoundMessage[H Hash, A Addr, V Hashable[H], M Message[V, H, A]]( + t *Tendermint[V, H, A], + m M, + r func(M) round, + addMessage func(M), +) bool { + mR := r(m) + if mR > t.state.r { + if mR-t.state.r > maxFutureRound { + return false + } + + t.futureMessagesMu.Lock() + addMessage(m) + t.futureMessagesMu.Unlock() + + t.line55(mR) + return false + } + return true +} + +func handleFutureHeightMessage[H Hash, A Addr, V Hashable[H], M Message[V, H, A]]( + t *Tendermint[V, H, A], + m M, + h func(M) height, + r func(M) round, + addMessage func(M), +) bool { + mH := h(m) + mR := r(m) + + if mH > t.state.h { + if mH-t.state.h > maxFutureHeight { + return false + } + + if mR > maxFutureRound { + return false + } + + t.futureMessagesMu.Lock() + addMessage(m) + t.futureMessagesMu.Unlock() + return false + } + return true +} + +func checkForQuorumPrecommit[H Hash, A Addr](precommitsForHR map[A][]Precommit[H, A], vID H) ([]Precommit[H, A], []A) { + var precommits []Precommit[H, A] + var vals []A + + for addr, valPrecommits := range precommitsForHR { + for _, p := range valPrecommits { + if *p.ID == vID { + precommits = append(precommits, p) + vals = append(vals, addr) + } + } + } + return precommits, vals +} + +func checkForQuorumPrevotesGivenPrevote[H Hash, A Addr](p Prevote[H, A], prevotesForHR map[A][]Prevote[H, A]) []A { + var vals []A + for addr, valPrevotes := range prevotesForHR { + for _, v := range valPrevotes { + if *v.ID == *p.ID { + vals = append(vals, addr) + } + } + } + return vals +} + +func checkQuorumPrevotesGivenProposalVID[H Hash, A Addr](prevotesForHVr map[A][]Prevote[H, A], vID H) []A { + var vals []A + for addr, valPrevotes := range prevotesForHVr { + for _, p := range valPrevotes { + if *p.ID == vID { + vals = append(vals, addr) + } + } + } + return vals +} + +func (t *Tendermint[V, H, A]) checkForMatchingProposalGivenPrecommit(p Precommit[H, A], + proposalsForHR map[A][]Proposal[V, H, A], +) *Proposal[V, H, A] { + var proposal *Proposal[V, H, A] + + for _, prop := range proposalsForHR[t.validators.Proposer(p.H, p.R)] { + if (*prop.Value).Hash() == *p.ID { + propCopy := prop + proposal = &propCopy + } + } + return proposal +} + +func (t *Tendermint[V, H, A]) checkForMatchingProposalGivenPrevote(p Prevote[H, A], + proposalsForHR map[A][]Proposal[V, H, A], +) *Proposal[V, H, A] { + var proposal *Proposal[V, H, A] + + for _, v := range proposalsForHR[t.validators.Proposer(p.H, p.R)] { + if (*v.Value).Hash() == *p.ID { + vCopy := v + proposal = &vCopy + } + } + return proposal +} diff --git a/consensus/tendermint/tendermint_test.go b/consensus/tendermint/tendermint_test.go new file mode 100644 index 0000000000..a1dfd36a6d --- /dev/null +++ b/consensus/tendermint/tendermint_test.go @@ -0,0 +1,277 @@ +package tendermint + +import ( + "testing" + "time" + + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/utils" + "github.com/stretchr/testify/assert" +) + +// Implements Hashable interface +type value uint64 + +func (t value) Hash() felt.Felt { + return *new(felt.Felt).SetUint64(uint64(t)) +} + +// Implements Application[value, felt.Felt] interface +type app struct { + cur value +} + +func newApp() *app { return &app{} } + +func (a *app) Value() value { + a.cur = (a.cur + 1) % 100 + return a.cur +} + +func (a *app) Valid(v value) bool { + return v < 100 +} + +// Implements Blockchain[value, felt.Felt] interface +type chain struct { + curHeight height + decision map[height]value + decisionCertificates map[height][]Precommit[felt.Felt, felt.Felt] +} + +func newChain() *chain { + return &chain{ + decision: make(map[height]value), + decisionCertificates: make(map[height][]Precommit[felt.Felt, felt.Felt]), + } +} + +func (c *chain) Height() height { + return c.curHeight +} + +func (c *chain) Commit(h height, v value, precommits []Precommit[felt.Felt, felt.Felt]) { + c.decision[c.curHeight] = v + c.decisionCertificates[c.curHeight] = precommits + c.curHeight++ +} + +// Implements Validators[felt.Felt] interface +type validators struct { + totalVotingPower uint + vals []felt.Felt +} + +func newVals() *validators { return &validators{} } + +func (v *validators) TotalVotingPower(h height) uint { + return v.totalVotingPower +} + +func (v *validators) ValidatorVotingPower(validatorAddr felt.Felt) uint { + return 1 +} + +// Proposer is implements round robin +func (v *validators) Proposer(h height, r round) felt.Felt { + i := (uint(h) + uint(r)) % v.totalVotingPower + return v.vals[i] +} + +func (v *validators) addValidator(addr felt.Felt) { + v.vals = append(v.vals, addr) + v.totalVotingPower++ +} + +// Implements Listener[M Message[V, H], V Hashable[H], H Hash] and Broadcasters[V Hashable[H], H Hash, A Addr] interface +type senderAndReceiver[M Message[V, H, A], V Hashable[H], H Hash, A Addr] struct { + mCh chan M +} + +func (r *senderAndReceiver[M, _, _, _]) send(m M) { + r.mCh <- m +} + +func (r *senderAndReceiver[M, _, _, _]) Listen() <-chan M { + return r.mCh +} + +func (r *senderAndReceiver[M, _, _, _]) Broadcast(msg M) { r.mCh <- msg } + +func (r *senderAndReceiver[M, _, _, A]) SendMsg(validatorAddr A, msg M) {} + +func newSenderAndReceiver[M Message[V, H, A], V Hashable[H], H Hash, A Addr]() *senderAndReceiver[M, V, H, A] { + return &senderAndReceiver[M, V, H, A]{mCh: make(chan M, 10)} +} + +func testListenersAndBroadcasters() (Listeners[value, felt.Felt, felt.Felt], Broadcasters[value, + felt.Felt, felt.Felt], +) { + listeners := Listeners[value, felt.Felt, felt.Felt]{ + ProposalListener: newSenderAndReceiver[Proposal[value, felt.Felt, felt.Felt], value, felt.Felt, felt.Felt](), + PrevoteListener: newSenderAndReceiver[Prevote[felt.Felt, felt.Felt], value, felt.Felt, felt.Felt](), + PrecommitListener: newSenderAndReceiver[Precommit[felt.Felt, felt.Felt], value, felt.Felt, felt.Felt](), + } + + broadcasters := Broadcasters[value, felt.Felt, felt.Felt]{ + ProposalBroadcaster: newSenderAndReceiver[Proposal[value, felt.Felt, felt.Felt], value, felt.Felt, felt.Felt](), + PrevoteBroadcaster: newSenderAndReceiver[Prevote[felt.Felt, felt.Felt], value, felt.Felt, felt.Felt](), + PrecommitBroadcaster: newSenderAndReceiver[Precommit[felt.Felt, felt.Felt], value, felt.Felt, felt.Felt](), + } + + return listeners, broadcasters +} + +func TestStartRound(t *testing.T) { + nodeAddr := new(felt.Felt).SetBytes([]byte("my node address")) + val2, val3, val4 := new(felt.Felt).SetUint64(2), new(felt.Felt).SetUint64(3), new(felt.Felt).SetUint64(4) + + tm := func(r round) time.Duration { return time.Duration(r) * time.Nanosecond } + + t.Run("node is the proposer", func(t *testing.T) { + listeners, broadcasters := testListenersAndBroadcasters() + app, chain, vals := newApp(), newChain(), newVals() + + vals.addValidator(*nodeAddr) + vals.addValidator(*val2) + vals.addValidator(*val3) + vals.addValidator(*val4) + + algo := New[value, felt.Felt, felt.Felt](*nodeAddr, app, chain, vals, listeners, broadcasters, tm, tm, tm) + + expectedHeight, expectedRound := height(0), round(0) + expectedProposalMsg := Proposal[value, felt.Felt, felt.Felt]{ + H: 0, + R: 0, + ValidRound: -1, + Value: utils.HeapPtr(app.cur + 1), + Sender: *nodeAddr, + } + + proposalBroadcaster := broadcasters.ProposalBroadcaster.(*senderAndReceiver[Proposal[value, felt.Felt, + felt.Felt], value, felt.Felt, felt.Felt]) + + algo.Start() + algo.Stop() + + proposal := <-proposalBroadcaster.mCh + + assert.Equal(t, expectedProposalMsg, proposal) + assert.Equal(t, 1, len(algo.messages.proposals[expectedHeight][expectedRound][*nodeAddr])) + assert.Equal(t, expectedProposalMsg, algo.messages.proposals[expectedHeight][expectedRound][*nodeAddr][0]) + + assert.Equal(t, propose, algo.state.s) + assert.Equal(t, expectedHeight, algo.state.h) + assert.Equal(t, expectedRound, algo.state.r) + }) + + t.Run("node is not the proposer: schedule timeoutPropose", func(t *testing.T) { + listeners, broadcasters := testListenersAndBroadcasters() + app, chain, vals := newApp(), newChain(), newVals() + + vals.addValidator(*val2) + vals.addValidator(*val3) + vals.addValidator(*val4) + vals.addValidator(*nodeAddr) + + algo := New[value, felt.Felt, felt.Felt](*nodeAddr, app, chain, vals, listeners, broadcasters, tm, tm, tm) + + algo.Start() + algo.Stop() + + assert.Equal(t, 1, len(algo.scheduledTms)) + + assert.Contains(t, algo.scheduledTms, timeout{s: propose, h: 0, r: 0}) + + assert.Equal(t, propose, algo.state.s) + assert.Equal(t, height(0), algo.state.h) + assert.Equal(t, round(0), algo.state.r) + }) + + t.Run("OnTimeoutPropose: round zero the node is not the proposer thus send a prevote nil", func(t *testing.T) { + listeners, broadcasters := testListenersAndBroadcasters() + app, chain, vals := newApp(), newChain(), newVals() + // The algo needs to run for a minimum amount of time but it cannot be long enough the state to change + // multiple times. This can happen if the timeouts are too small. + tm := func(r round) time.Duration { + if r == 0 { + return time.Nanosecond + } + return time.Second + } + + vals.addValidator(*val2) + vals.addValidator(*val3) + vals.addValidator(*val4) + vals.addValidator(*nodeAddr) + + algo := New[value, felt.Felt, felt.Felt](*nodeAddr, app, chain, vals, listeners, broadcasters, tm, tm, tm) + + expectedHeight, expectedRound := height(0), round(0) + expectedPrevoteMsg := Prevote[felt.Felt, felt.Felt]{ + H: 0, + R: 0, + ID: nil, + Sender: *nodeAddr, + } + + prevoteBroadcaster := broadcasters.PrevoteBroadcaster.(*senderAndReceiver[Prevote[felt.Felt, felt.Felt], value, + felt.Felt, felt.Felt]) + + algo.Start() + time.Sleep(time.Millisecond) + algo.Stop() + + prevoteMsg := <-prevoteBroadcaster.mCh + + assert.Equal(t, expectedPrevoteMsg, prevoteMsg) + assert.Equal(t, 1, len(algo.messages.prevotes[expectedHeight][expectedRound][*nodeAddr])) + assert.Equal(t, expectedPrevoteMsg, algo.messages.prevotes[expectedHeight][expectedRound][*nodeAddr][0]) + + assert.Equal(t, prevote, algo.state.s) + assert.Equal(t, expectedHeight, algo.state.h) + assert.Equal(t, expectedRound, algo.state.r) + }) +} + +func TestThresholds(t *testing.T) { + tests := []struct { + n uint + q uint + f uint + }{ + {1, 1, 0}, + {2, 2, 0}, + {3, 2, 0}, + {4, 3, 1}, + {5, 4, 1}, + {6, 4, 1}, + {7, 5, 2}, + {11, 8, 3}, + {15, 10, 4}, + {20, 14, 6}, + {100, 67, 33}, + {150, 100, 49}, + {2000, 1334, 666}, + {2509, 1673, 836}, + {3045, 2030, 1014}, + {7689, 5126, 2562}, + {10032, 6688, 3343}, + {12932, 8622, 4310}, + {15982, 10655, 5327}, + {301234, 200823, 100411}, + {301235, 200824, 100411}, + {301236, 200824, 100411}, + } + + for _, test := range tests { + assert.Equal(t, test.q, q(test.n)) + assert.Equal(t, test.f, f(test.n)) + + assert.True(t, 2*q(test.n) > test.n+f(test.n)) + assert.True(t, 2*(q(test.n)-1) <= test.n+f(test.n)) + } +} + +// Todo: Add tests for round change where existing messages are processed +// Todo: Add malicious test