Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions candidates/siddhantprateek/paxos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"sync"
"time"
)

type Proposal struct {
Expand All @@ -21,6 +22,7 @@ type Learner struct {
mu sync.Mutex
accepted []Proposal
quorumSize int
decided chan struct{} // Channel to notify when decided
}

type Proposer struct {
Expand All @@ -29,14 +31,18 @@ type Proposer struct {
value interface{}
acceptors []*Acceptor
learners []*Learner
stop chan struct{} // Channel to stop proposing
}

func NewAcceptor() *Acceptor {
return &Acceptor{}
}

func NewLearner(quorumSize int) *Learner {
return &Learner{quorumSize: quorumSize}
return &Learner{
quorumSize: quorumSize,
decided: make(chan struct{}),
}
}

func NewProposer(proposalNum int, value interface{}, acceptors []*Acceptor, learners []*Learner) *Proposer {
Expand All @@ -45,6 +51,18 @@ func NewProposer(proposalNum int, value interface{}, acceptors []*Acceptor, lear
value: value,
acceptors: acceptors,
learners: learners,
stop: make(chan struct{}),
}
}

func (p *Proposer) Stop() {
p.mu.Lock()
defer p.mu.Unlock()
select {
case <-p.stop:
// Already stopped
default:
close(p.stop)
}
}

Expand Down Expand Up @@ -74,14 +92,25 @@ func (l *Learner) ReceiveAccepted(prop Proposal) bool {
defer l.mu.Unlock()
l.accepted = append(l.accepted, prop)
if len(l.accepted) >= l.quorumSize {
select {
case <-l.decided:
// Already closed
default:
close(l.decided)
}
return true
} else {
return false
}
return false
}

func (p *Proposer) Propose() {
for {
select {
case <-p.stop:
return
default:
}

p.mu.Lock()
n := p.proposalNum
p.mu.Unlock()
Expand Down Expand Up @@ -109,6 +138,8 @@ func (p *Proposer) Propose() {
p.mu.Lock()
p.proposalNum++
p.mu.Unlock()
// Wait/backoff briefly before next proposal round to reduce contention
time.Sleep(10 * time.Millisecond)
} else {
p.mu.Lock()
p.proposalNum++
Expand All @@ -127,8 +158,20 @@ func (p *Proposer) Propose() {
for _, learner := range p.learners {
learner.ReceiveAccepted(Proposal{Number: n, Value: p.value, Decided: true})
}
// Consensus has been reached successfully, terminate proposal loop.
return
} else {
// Failed accept phase: backoff briefly
time.Sleep(10 * time.Millisecond)
}
}
} else {
// Prepare phase failed to reach quorum.
// Increment proposal number to prepare for next round and sleep to avoid busy spin.
p.mu.Lock()
p.proposalNum++
p.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
}
}
Expand Down
35 changes: 30 additions & 5 deletions candidates/siddhantprateek/paxos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,28 @@ func TestPaxosConsensus(t *testing.T) {
learners[i] = NewLearner(numAcceptors/2 + 1)
}

proposers := make([]*Proposer, numProposers)
for i := 0; i < numProposers; i++ {
proposer := NewProposer(i, fmt.Sprintf("Value from Proposer %d", i), acceptors, learners)
go proposer.Propose()
// Initialize proposers starting at proposalNum >= 1 to exceed acceptor's promisedNum (0)
proposers[i] = NewProposer(i+1, fmt.Sprintf("Value from Proposer %d", i), acceptors, learners)
go proposers[i].Propose()
}

// Wait for all learners to decide using event channel with a timeout
for i, learner := range learners {
select {
case <-learner.decided:
// Consensus reached for this learner
case <-time.After(2 * time.Second):
t.Fatalf("Timeout waiting for learner %d to decide", i)
}
}

// Clean up proposers
for _, proposer := range proposers {
proposer.Stop()
}

time.Sleep(1 * time.Second)
for _, learner := range learners {
if len(learner.accepted) == 0 {
t.Errorf("Learner did not receive any accepted proposals.")
Expand All @@ -42,10 +58,19 @@ func TestPaxosConsensus(t *testing.T) {
func TestPaxosProposer(t *testing.T) {
acceptor := NewAcceptor()
learner := NewLearner(1)
proposer := NewProposer(0, "Test Value", []*Acceptor{acceptor}, []*Learner{learner})
// Proposer proposalNum must start at >= 1 to be accepted by the acceptor (promisedNum starts at 0)
proposer := NewProposer(1, "Test Value", []*Acceptor{acceptor}, []*Learner{learner})

go proposer.Propose()
time.Sleep(100 * time.Millisecond)
defer proposer.Stop()

// Wait for learner to decide
select {
case <-learner.decided:
// Decided successfully
case <-time.After(2 * time.Second):
t.Fatalf("Timeout waiting for learner to decide")
}

if len(learner.accepted) != 1 {
t.Errorf("Learner did not receive the accepted proposal.")
Expand Down