Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions candidates/siddhantprateek/mirbft/mirbft-sample/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ func (mb *MirBFT) Propose(request Request) {
Request: request,
}

mb.Mutex.Lock()
mb.Pending[hash] = proposal
mb.Mutex.Unlock()

// Broadcast the proposal to other nodes
for _, recipient := range mb.Leaders {
Expand All @@ -92,13 +94,17 @@ func (mb *MirBFT) Propose(request Request) {
}

func (mb *MirBFT) ProcessProposal(proposal Proposal) {
mb.Mutex.Lock()
if mb.Received[proposal.LeaderID] == nil {
mb.Received[proposal.LeaderID] = make(map[string]bool)
}
mb.Received[proposal.LeaderID][proposal.Hash] = true

enoughProposals := len(mb.Received[proposal.LeaderID]) >= (mb.NumNodes/2)+1
mb.Mutex.Unlock()

// Check if there are enough unique proposals received from leaders
if len(mb.Received[proposal.LeaderID]) >= (mb.NumNodes/2)+1 {
if enoughProposals {
// Send acknowledgment to the leader
ackMsg := Message{
SenderID: mb.NodeID,
Expand All @@ -114,13 +120,15 @@ func (mb *MirBFT) ProcessProposal(proposal Proposal) {
}

func (mb *MirBFT) Commit(hash string) {
if _, ok := mb.Pending[hash]; ok && !mb.Committed[hash] {
// Execute the request associated with the proposal
request := mb.Pending[hash].Request
ExecuteRequest(request)

// Mark the proposal as committed
mb.Mutex.Lock()
prop, exists := mb.Pending[hash]
isCommitted := mb.Committed[hash]
if exists && !isCommitted {
mb.Committed[hash] = true
mb.Mutex.Unlock()

// Execute the request associated with the proposal
ExecuteRequest(prop.Request)

// Broadcast the commit message to all nodes
commitMsg := Message{
Expand All @@ -130,6 +138,8 @@ func (mb *MirBFT) Commit(hash string) {
}
// Send the commit message to all nodes
SendMessage(commitMsg)
} else {
mb.Mutex.Unlock()
}
}

Expand Down
69 changes: 69 additions & 0 deletions candidates/siddhantprateek/mirbft/mirbft-sample/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"fmt"
"sync"
"testing"
)

func TestMirBFTConcurrency(t *testing.T) {
numNodes := 4
mb := NewMirBFT(0, numNodes)

// We will spawn multiple goroutines to propose requests concurrently
// and verify that map access is fully synchronized without causing panic.
var wg sync.WaitGroup
numConcurrent := 50

for i := 0; i < numConcurrent; i++ {
wg.Add(1)
go func(clientId int) {
defer wg.Done()
req := Request{
ClientID: clientId,
Data: fmt.Sprintf("Transaction data %d", clientId),
Signature: fmt.Sprintf("sig-%d", clientId),
}
mb.Propose(req)
}(i)
}

wg.Wait()

mb.Mutex.Lock()
pendingLen := len(mb.Pending)
mb.Mutex.Unlock()

if pendingLen != numConcurrent {
t.Errorf("Expected %d pending proposals, got %d", numConcurrent, pendingLen)
}
}

func TestMirBFTProcessProposalConcurrency(t *testing.T) {
numNodes := 4
mb := NewMirBFT(0, numNodes)

// Pre-fill pending proposals
req := Request{ClientID: 1, Data: "data", Signature: "sig"}
hash := Hash(req)
mb.Pending[hash] = Proposal{LeaderID: 1, Hash: hash, Request: req}

var wg sync.WaitGroup
numConcurrent := 50

// Simulating multiple concurrent processing calls for proposals from different leaders
for i := 0; i < numConcurrent; i++ {
wg.Add(1)
go func(leaderId int) {
defer wg.Done()
prop := Proposal{
LeaderID: leaderId,
Hash: hash,
Request: req,
}
mb.ProcessProposal(prop)
}(i % numNodes)
}

wg.Wait()
}