Skip to content

Commit

Permalink
Leader Read Opt is actually Linearizable
Browse files Browse the repository at this point in the history
  • Loading branch information
otoolep committed Oct 9, 2024
1 parent dfa5894 commit b00eada
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 184 deletions.
2 changes: 1 addition & 1 deletion cluster/proto/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

279 changes: 137 additions & 142 deletions command/proto/command.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion command/proto/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ message QueryRequest {
QUERY_REQUEST_LEVEL_WEAK = 1;
QUERY_REQUEST_LEVEL_STRONG = 2;
QUERY_REQUEST_LEVEL_AUTO = 3;
QUERY_REQUEST_LEVEL_LINEARIZABLE = 4;
}
Level level = 3;
int64 freshness = 4;
bool freshness_strict = 5;
bool leader_read_opt = 6;
}

message Values {
Expand Down
7 changes: 2 additions & 5 deletions http/query_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,6 @@ func (qp QueryParams) Compress() bool {
return qp.HasKey("compress")
}

// EnableLeaderReadOptimization returns true if the query parameters request leader read optimizations.
func (qp QueryParams) LeaderReadOpt() bool {
return qp.HasKey("leader_read_opt")
}

// Key returns the value of the key named "key".
func (qp QueryParams) Key() string {
return qp["key"]
Expand Down Expand Up @@ -165,6 +160,8 @@ func (qp QueryParams) Level() command.QueryRequest_Level {
return command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
case "auto":
return command.QueryRequest_QUERY_REQUEST_LEVEL_AUTO
case "linearizable":
return command.QueryRequest_QUERY_REQUEST_LEVEL_LINEARIZABLE
default:
return command.QueryRequest_QUERY_REQUEST_LEVEL_WEAK
}
Expand Down
1 change: 0 additions & 1 deletion http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,6 @@ func (s *Service) handleQuery(w http.ResponseWriter, r *http.Request, qp QueryPa
Level: qp.Level(),
Freshness: qp.Freshness().Nanoseconds(),
FreshnessStrict: qp.FreshnessStrict(),
LeaderReadOpt: qp.LeaderReadOpt(),
}

results, resultsErr := s.store.Query(qr)
Expand Down
35 changes: 24 additions & 11 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ func (s *Store) execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteQueryResponse
}

// Query executes queries that return rows, and do not modify the database.
func (s *Store) Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error) {
func (s *Store) Query(qr *proto.QueryRequest) (rows []*proto.QueryRows, retErr error) {
p := (*PragmaCheckRequest)(qr.Request)
if err := p.Check(); err != nil {
return nil, err
Expand All @@ -1121,6 +1121,20 @@ func (s *Store) Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error) {
return nil, ErrNotOpen
}

// If linearizable consistency is requested, we will need to check the
// term when query processing completes -- assuming query processing
// proceeded without error.
initTerm := s.fsmTerm.Load()
defer func() {
if retErr == nil && qr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_LINEARIZABLE {
if err := s.VerifyLeader(); err != nil {
retErr = err
} else if s.fsmTerm.Load() != initTerm {
retErr = ErrStaleRead
}
}
}()

if qr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_AUTO {
qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_WEAK
isVoter, err := s.IsVoter()
Expand All @@ -1132,7 +1146,13 @@ func (s *Store) Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error) {
}
}

if qr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG && !qr.LeaderReadOpt {
if qr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_LINEARIZABLE {
if s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
}

if qr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG {
if s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
Expand Down Expand Up @@ -1174,21 +1194,14 @@ func (s *Store) Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error) {
return nil, ErrStaleRead
}

if qr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG && qr.LeaderReadOpt {
if err := s.VerifyLeader(); err != nil {
return nil, err
}
}

return s.db.Query(qr.Request, qr.Timings)
}

// VerifyLeader checks that the current node is the Raft leader.
func (s *Store) VerifyLeader() error {
future := s.raft.VerifyLeader()
if err := future.Error(); err != nil {
if err == raft.ErrNotLeader {
return ErrNotLeader
} else if err == raft.ErrLeadershipLost {
if err == raft.ErrNotLeader || err == raft.ErrLeadershipLost {
return ErrNotLeader
}
return fmt.Errorf("failed to verify leader: %s", err.Error())
Expand Down
29 changes: 11 additions & 18 deletions store/store_multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1267,8 +1267,7 @@ func Test_MultiNodeStoreLogTruncation(t *testing.T) {
}
}

func Test_MultiNodeExecuteQuery_LeaderReadOpt_AllUp(t *testing.T) {
// Set up a 3-node cluster
func Test_MultiNodeExecuteQuery_Linearizable_AllUp(t *testing.T) {
s0, ln0 := mustNewStore(t)
defer ln0.Close()
if err := s0.Open(); err != nil {
Expand Down Expand Up @@ -1316,15 +1315,14 @@ func Test_MultiNodeExecuteQuery_LeaderReadOpt_AllUp(t *testing.T) {
t.Fatalf("failed to execute on leader: %s", err.Error())
}

// Perform a strong read consistency query with leader read optimization on all nodes
// Perform a query with Linearizable optimization on all nodes
var leaderCount int
for _, s := range []*Store{s0, s1, s2} {
if s.IsLeader() {
leaderCount++
}
qr := queryRequestFromString("SELECT * FROM foo", false, false)
qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
qr.LeaderReadOpt = true
qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_LINEARIZABLE
r, err := s.Query(qr)
if err != nil {
// if this node is not the leader, it will return an error
Expand Down Expand Up @@ -1352,8 +1350,7 @@ func Test_MultiNodeExecuteQuery_LeaderReadOpt_AllUp(t *testing.T) {
}
}

func Test_MultiNodeExecuteQuery_LeaderReadOpt_Quorum(t *testing.T) {
// Set up a 3-node cluster
func Test_MultiNodeExecuteQuery_Linearizable_Quorum(t *testing.T) {
s0, ln0 := mustNewStore(t)
defer ln0.Close()
if err := s0.Open(); err != nil {
Expand Down Expand Up @@ -1406,15 +1403,14 @@ func Test_MultiNodeExecuteQuery_LeaderReadOpt_Quorum(t *testing.T) {
t.Fatalf("failed to close node: %s", err.Error())
}

// Perform a strong read consistency query with leader read optimization on the remaining nodes
// Perform a query with Linearizable consistency on the remaining nodes
var leaderCount int
for _, s := range []*Store{s0, s1} {
if s.IsLeader() {
leaderCount++
}
qr := queryRequestFromString("SELECT * FROM foo", false, false)
qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
qr.LeaderReadOpt = true
qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_LINEARIZABLE
r, err := s.Query(qr)
if err != nil {
// if this node is not the leader, it will return an error
Expand All @@ -1425,7 +1421,7 @@ func Test_MultiNodeExecuteQuery_LeaderReadOpt_Quorum(t *testing.T) {
}
continue
} else {
t.Fatalf("failed to perform query with leader read optimization on quorum: %s", err.Error())
t.Fatalf("failed to perform query with Linearizable consistency on quorum: %s", err.Error())
}
}

Expand All @@ -1442,8 +1438,7 @@ func Test_MultiNodeExecuteQuery_LeaderReadOpt_Quorum(t *testing.T) {
}
}

func Test_MultiNodeExecuteQuery_LeaderReadOpt_NoQuorum(t *testing.T) {
// Set up a 3-node cluster
func Test_MultiNodeExecuteQuery_Linearizable_NoQuorum(t *testing.T) {
s0, ln0 := mustNewStore(t)
defer ln0.Close()
if err := s0.Open(); err != nil {
Expand Down Expand Up @@ -1499,14 +1494,12 @@ func Test_MultiNodeExecuteQuery_LeaderReadOpt_NoQuorum(t *testing.T) {
t.Fatalf("failed to close node: %s", err.Error())
}

// Perform a strong read consistency query with leader read optimization on the remaining nodes (should fail)
// Perform a query with Linearizable consistency on the remaining nodes
// which should fail.
qr := queryRequestFromString("SELECT * FROM foo", false, false)
qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
qr.LeaderReadOpt = true
qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_LINEARIZABLE

// s0 should not provide read index availability
_, err = s0.Query(qr)

if err == nil {
t.Fatalf("expected query to fail, but it did not")
}
Expand Down
9 changes: 4 additions & 5 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,9 +853,9 @@ func Test_SingleNodeExecuteQuery(t *testing.T) {
}
}

// Test_SingleNodeExecuteQuery_LeaderReadOpt tests that a Store correctly responds to a simple
// Execute and Query request with the LeaderReadOpt flag set.
func Test_SingleNodeExecuteQuery_LeaderReadOpt(t *testing.T) {
// Test_SingleNodeExecuteQuery_Linearizable tests that a Store correctly responds to a
// simple Query request with Linearizable consistency level.
func Test_SingleNodeExecuteQuery_Linearizable(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()

Expand All @@ -880,8 +880,7 @@ func Test_SingleNodeExecuteQuery_LeaderReadOpt(t *testing.T) {
}

qr := queryRequestFromString("SELECT * FROM foo", false, false)
qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
qr.LeaderReadOpt = true
qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_LINEARIZABLE
r, err := s.Query(qr)
if err != nil {
t.Fatalf("failed to perform strong query on single node: %s", err.Error())
Expand Down

0 comments on commit b00eada

Please sign in to comment.