Skip to content

Commit

Permalink
Node can retrieve commit index of other node
Browse files Browse the repository at this point in the history
  • Loading branch information
otoolep committed Sep 29, 2024
1 parent 2962ad6 commit af864f7
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 0 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## v8.31.3 (unreleased)
### Implementation changes and bug fixes
- [PR #1923](https://github.com/rqlite/rqlite/pull/1923): Node can retrieve commit index of other node.

## v8.31.2 (September 28th 2024)
### Implementation changes and bug fixes
- [PR #1915](https://github.com/rqlite/rqlite/pull/1915): Move VACUUM-related testing to its own files.
Expand Down
20 changes: 20 additions & 0 deletions cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,26 @@ func (c *Client) GetNodeAPIAddr(nodeAddr string, retries int, timeout time.Durat
return a.Url, nil
}

// GetCommitIndex retrieves the commit index for the node at nodeAddr
func (c *Client) GetCommitIndex(nodeAddr string, retries int, timeout time.Duration) (uint64, error) {
command := &proto.Command{
Type: proto.Command_COMMAND_TYPE_GET_NODE_API_URL,
}
p, nr, err := c.retry(command, nodeAddr, timeout, retries)
stats.Add(numGetNodeAPIRequestRetries, int64(nr))
if err != nil {
return 0, err
}

a := &proto.NodeMeta{}
err = pb.Unmarshal(p, a)
if err != nil {
return 0, fmt.Errorf("protobuf unmarshal: %w", err)
}

return a.CommitIndex, nil
}

// Execute performs an Execute on a remote node. If username is an empty string
// no credential information will be included in the Execute request to the
// remote node.
Expand Down
36 changes: 36 additions & 0 deletions cluster/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,42 @@ func Test_ClientGetNodeAPIAddr(t *testing.T) {
}
}

func Test_ClientGetCommitIndex(t *testing.T) {
srv := servicetest.NewService()
srv.Handler = func(conn net.Conn) {
var p []byte
var err error
c := readCommand(conn)
if c == nil {
// Error on connection, so give up, as normal
// test exit can cause that too.
return
}
if c.Type != proto.Command_COMMAND_TYPE_GET_NODE_API_URL {
t.Fatalf("unexpected command type: %d", c.Type)
}
p, err = pb.Marshal(&proto.NodeMeta{
CommitIndex: 5678,
})
if err != nil {
conn.Close()
}
writeBytesWithLength(conn, p)
}
srv.Start()
defer srv.Close()

c := NewClient(&simpleDialer{}, 0)
idx, err := c.GetCommitIndex(srv.Addr(), noRetries, time.Second)
if err != nil {
t.Fatal(err)
}
exp, got := uint64(5678), idx
if exp != got {
t.Fatalf("unexpected addr, got %d, exp: %d", got, exp)
}
}

func Test_ClientExecute(t *testing.T) {
srv := servicetest.NewService()
srv.Handler = func(conn net.Conn) {
Expand Down
29 changes: 29 additions & 0 deletions cluster/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,35 @@ func Test_NewServiceSetGetNodeAPIAddrTLS(t *testing.T) {
}
}

func Test_NewServiceGetCommitIndex(t *testing.T) {
ml := mustNewMockTransport()
mgr := mustNewMockManager()
s := New(ml, mustNewMockDatabase(), mgr, mustNewMockCredentialStore())
if s == nil {
t.Fatalf("failed to create cluster service")
}

if err := s.Open(); err != nil {
t.Fatalf("failed to open cluster service")
}
s.EnableHTTPS(true)

// Test fetch via network.
mgr.commitIndex = 1234
c := NewClient(ml, 30*time.Second)
idx, err := c.GetCommitIndex(s.Addr(), noRetries, 5*time.Second)
if err != nil {
t.Fatalf("failed to get node API address: %s", err)
}
if idx != 1234 {
t.Fatalf("failed to get correct node commit index, exp %d, got %d", 1234, idx)
}

if err := s.Close(); err != nil {
t.Fatalf("failed to close cluster service")
}
}

func Test_NewServiceTestExecuteQueryAuthNoCredentials(t *testing.T) {
ml := mustNewMockTransport()
db := mustNewMockDatabase()
Expand Down

0 comments on commit af864f7

Please sign in to comment.