From af864f7ae5f4ebc531b3bf159fbea83654d80fa4 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sun, 29 Sep 2024 18:51:19 -0400 Subject: [PATCH] Node can retrieve commit index of other node --- CHANGELOG.md | 4 ++++ cluster/client.go | 20 ++++++++++++++++++++ cluster/client_test.go | 36 ++++++++++++++++++++++++++++++++++++ cluster/service_test.go | 29 +++++++++++++++++++++++++++++ 4 files changed, 89 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a1a83d996..5c2d4c39d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## v8.31.3 (unreleased) +### Implementation changes and bug fixes +- [PR #1923](https://github.com/rqlite/rqlite/pull/1923): Node can retrieve commit index of other node. + ## v8.31.2 (September 28th 2024) ### Implementation changes and bug fixes - [PR #1915](https://github.com/rqlite/rqlite/pull/1915): Move VACUUM-related testing to its own files. diff --git a/cluster/client.go b/cluster/client.go index 341fbcb26..10e3d7c3a 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -125,6 +125,26 @@ func (c *Client) GetNodeAPIAddr(nodeAddr string, retries int, timeout time.Durat return a.Url, nil } +// GetCommitIndex retrieves the commit index for the node at nodeAddr +func (c *Client) GetCommitIndex(nodeAddr string, retries int, timeout time.Duration) (uint64, error) { + command := &proto.Command{ + Type: proto.Command_COMMAND_TYPE_GET_NODE_API_URL, + } + p, nr, err := c.retry(command, nodeAddr, timeout, retries) + stats.Add(numGetNodeAPIRequestRetries, int64(nr)) + if err != nil { + return 0, err + } + + a := &proto.NodeMeta{} + err = pb.Unmarshal(p, a) + if err != nil { + return 0, fmt.Errorf("protobuf unmarshal: %w", err) + } + + return a.CommitIndex, nil +} + // Execute performs an Execute on a remote node. If username is an empty string // no credential information will be included in the Execute request to the // remote node. diff --git a/cluster/client_test.go b/cluster/client_test.go index 4140164c3..3eabeb97f 100644 --- a/cluster/client_test.go +++ b/cluster/client_test.go @@ -57,6 +57,42 @@ func Test_ClientGetNodeAPIAddr(t *testing.T) { } } +func Test_ClientGetCommitIndex(t *testing.T) { + srv := servicetest.NewService() + srv.Handler = func(conn net.Conn) { + var p []byte + var err error + c := readCommand(conn) + if c == nil { + // Error on connection, so give up, as normal + // test exit can cause that too. + return + } + if c.Type != proto.Command_COMMAND_TYPE_GET_NODE_API_URL { + t.Fatalf("unexpected command type: %d", c.Type) + } + p, err = pb.Marshal(&proto.NodeMeta{ + CommitIndex: 5678, + }) + if err != nil { + conn.Close() + } + writeBytesWithLength(conn, p) + } + srv.Start() + defer srv.Close() + + c := NewClient(&simpleDialer{}, 0) + idx, err := c.GetCommitIndex(srv.Addr(), noRetries, time.Second) + if err != nil { + t.Fatal(err) + } + exp, got := uint64(5678), idx + if exp != got { + t.Fatalf("unexpected addr, got %d, exp: %d", got, exp) + } +} + func Test_ClientExecute(t *testing.T) { srv := servicetest.NewService() srv.Handler = func(conn net.Conn) { diff --git a/cluster/service_test.go b/cluster/service_test.go index 023d40410..2cc071354 100644 --- a/cluster/service_test.go +++ b/cluster/service_test.go @@ -175,6 +175,35 @@ func Test_NewServiceSetGetNodeAPIAddrTLS(t *testing.T) { } } +func Test_NewServiceGetCommitIndex(t *testing.T) { + ml := mustNewMockTransport() + mgr := mustNewMockManager() + s := New(ml, mustNewMockDatabase(), mgr, mustNewMockCredentialStore()) + if s == nil { + t.Fatalf("failed to create cluster service") + } + + if err := s.Open(); err != nil { + t.Fatalf("failed to open cluster service") + } + s.EnableHTTPS(true) + + // Test fetch via network. + mgr.commitIndex = 1234 + c := NewClient(ml, 30*time.Second) + idx, err := c.GetCommitIndex(s.Addr(), noRetries, 5*time.Second) + if err != nil { + t.Fatalf("failed to get node API address: %s", err) + } + if idx != 1234 { + t.Fatalf("failed to get correct node commit index, exp %d, got %d", 1234, idx) + } + + if err := s.Close(); err != nil { + t.Fatalf("failed to close cluster service") + } +} + func Test_NewServiceTestExecuteQueryAuthNoCredentials(t *testing.T) { ml := mustNewMockTransport() db := mustNewMockDatabase()