diff --git a/candidates/siddhantprateek/paxos/main.go b/candidates/siddhantprateek/paxos/main.go index 256789a..4dc2e9b 100644 --- a/candidates/siddhantprateek/paxos/main.go +++ b/candidates/siddhantprateek/paxos/main.go @@ -3,6 +3,7 @@ package main import ( "fmt" "sync" + "time" ) type Proposal struct { @@ -21,6 +22,7 @@ type Learner struct { mu sync.Mutex accepted []Proposal quorumSize int + decided chan struct{} // Channel to notify when decided } type Proposer struct { @@ -29,6 +31,7 @@ type Proposer struct { value interface{} acceptors []*Acceptor learners []*Learner + stop chan struct{} // Channel to stop proposing } func NewAcceptor() *Acceptor { @@ -36,7 +39,10 @@ func NewAcceptor() *Acceptor { } func NewLearner(quorumSize int) *Learner { - return &Learner{quorumSize: quorumSize} + return &Learner{ + quorumSize: quorumSize, + decided: make(chan struct{}), + } } func NewProposer(proposalNum int, value interface{}, acceptors []*Acceptor, learners []*Learner) *Proposer { @@ -45,6 +51,18 @@ func NewProposer(proposalNum int, value interface{}, acceptors []*Acceptor, lear value: value, acceptors: acceptors, learners: learners, + stop: make(chan struct{}), + } +} + +func (p *Proposer) Stop() { + p.mu.Lock() + defer p.mu.Unlock() + select { + case <-p.stop: + // Already stopped + default: + close(p.stop) } } @@ -74,14 +92,25 @@ func (l *Learner) ReceiveAccepted(prop Proposal) bool { defer l.mu.Unlock() l.accepted = append(l.accepted, prop) if len(l.accepted) >= l.quorumSize { + select { + case <-l.decided: + // Already closed + default: + close(l.decided) + } return true - } else { - return false } + return false } func (p *Proposer) Propose() { for { + select { + case <-p.stop: + return + default: + } + p.mu.Lock() n := p.proposalNum p.mu.Unlock() @@ -109,6 +138,8 @@ func (p *Proposer) Propose() { p.mu.Lock() p.proposalNum++ p.mu.Unlock() + // Wait/backoff briefly before next proposal round to reduce contention + time.Sleep(10 * time.Millisecond) } else { p.mu.Lock() p.proposalNum++ @@ -127,8 +158,20 @@ func (p *Proposer) Propose() { for _, learner := range p.learners { learner.ReceiveAccepted(Proposal{Number: n, Value: p.value, Decided: true}) } + // Consensus has been reached successfully, terminate proposal loop. + return + } else { + // Failed accept phase: backoff briefly + time.Sleep(10 * time.Millisecond) } } + } else { + // Prepare phase failed to reach quorum. + // Increment proposal number to prepare for next round and sleep to avoid busy spin. + p.mu.Lock() + p.proposalNum++ + p.mu.Unlock() + time.Sleep(10 * time.Millisecond) } } } diff --git a/candidates/siddhantprateek/paxos/main_test.go b/candidates/siddhantprateek/paxos/main_test.go index 38402bd..fe60ffa 100644 --- a/candidates/siddhantprateek/paxos/main_test.go +++ b/candidates/siddhantprateek/paxos/main_test.go @@ -21,12 +21,28 @@ func TestPaxosConsensus(t *testing.T) { learners[i] = NewLearner(numAcceptors/2 + 1) } + proposers := make([]*Proposer, numProposers) for i := 0; i < numProposers; i++ { - proposer := NewProposer(i, fmt.Sprintf("Value from Proposer %d", i), acceptors, learners) - go proposer.Propose() + // Initialize proposers starting at proposalNum >= 1 to exceed acceptor's promisedNum (0) + proposers[i] = NewProposer(i+1, fmt.Sprintf("Value from Proposer %d", i), acceptors, learners) + go proposers[i].Propose() + } + + // Wait for all learners to decide using event channel with a timeout + for i, learner := range learners { + select { + case <-learner.decided: + // Consensus reached for this learner + case <-time.After(2 * time.Second): + t.Fatalf("Timeout waiting for learner %d to decide", i) + } + } + + // Clean up proposers + for _, proposer := range proposers { + proposer.Stop() } - time.Sleep(1 * time.Second) for _, learner := range learners { if len(learner.accepted) == 0 { t.Errorf("Learner did not receive any accepted proposals.") @@ -42,10 +58,19 @@ func TestPaxosConsensus(t *testing.T) { func TestPaxosProposer(t *testing.T) { acceptor := NewAcceptor() learner := NewLearner(1) - proposer := NewProposer(0, "Test Value", []*Acceptor{acceptor}, []*Learner{learner}) + // Proposer proposalNum must start at >= 1 to be accepted by the acceptor (promisedNum starts at 0) + proposer := NewProposer(1, "Test Value", []*Acceptor{acceptor}, []*Learner{learner}) go proposer.Propose() - time.Sleep(100 * time.Millisecond) + defer proposer.Stop() + + // Wait for learner to decide + select { + case <-learner.decided: + // Decided successfully + case <-time.After(2 * time.Second): + t.Fatalf("Timeout waiting for learner to decide") + } if len(learner.accepted) != 1 { t.Errorf("Learner did not receive the accepted proposal.")