From 5c2baa8d17bb1bc8c0d7ed3bb3425d13f8bbe7e7 Mon Sep 17 00:00:00 2001 From: John Sullivan Date: Wed, 7 Oct 2015 14:43:22 -0400 Subject: [PATCH 01/13] Initial attempt to port 'reachability' command from serf to consul. --- command/agent/agent.go | 12 ++ command/agent/ipc_query_response_stream.go | 91 +++++++++++ command/agent/rpc.go | 39 +++++ command/agent/rpc_client.go | 156 ++++++++++++++++++ command/reachability.go | 175 +++++++++++++++++++++ commands.go | 6 + consul/client.go | 17 ++ 7 files changed, 496 insertions(+) create mode 100644 command/agent/ipc_query_response_stream.go create mode 100644 command/reachability.go diff --git a/command/agent/agent.go b/command/agent/agent.go index 14db1ea3de70..f50e4eab4bd8 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1456,3 +1456,15 @@ func (a *Agent) DisableNodeMaintenance() { a.RemoveCheck(nodeMaintCheckID, true) a.logger.Printf("[INFO] agent: Node left maintenance mode") } + +// Query sends a Query on Serf, see Serf.Query. +func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { + a.logger.Printf("[DEBUG] jfs: consul/command/agent/agent.go Query() invoked...") + a.logger.Printf("[DEBUG] agent: Requesting query send: %s. Payload: %#v", + name, string(payload)) + resp, err := a.client.Query(name, payload, params) + if err != nil { + a.logger.Printf("[WARN] agent: failed to start user query: %v", err) + } + return resp, err +} diff --git a/command/agent/ipc_query_response_stream.go b/command/agent/ipc_query_response_stream.go new file mode 100644 index 000000000000..c557831d27dc --- /dev/null +++ b/command/agent/ipc_query_response_stream.go @@ -0,0 +1,91 @@ +package agent + +import ( + "github.com/hashicorp/serf/serf" + "log" + "time" +) + +// queryResponseStream is used to stream the query results back to a client +type queryResponseStream struct { + client streamClient + logger *log.Logger + seq uint64 +} + +func newQueryResponseStream(client streamClient, seq uint64, logger *log.Logger) *queryResponseStream { + qs := &queryResponseStream{ + client: client, + logger: logger, + seq: seq, + } + return qs +} + +// Stream is a long running routine used to stream the results of a query back to a client +func (qs *queryResponseStream) Stream(resp *serf.QueryResponse) { + // Setup a timer for the query ending + remaining := resp.Deadline().Sub(time.Now()) + done := time.After(remaining) + + ackCh := resp.AckCh() + respCh := resp.ResponseCh() + for { + select { + case a := <-ackCh: + if err := qs.sendAck(a); err != nil { + qs.logger.Printf("[ERR] agent.ipc: Failed to stream ack to %v: %v", qs.client, err) + return + } + case r := <-respCh: + if err := qs.sendResponse(r.From, r.Payload); err != nil { + qs.logger.Printf("[ERR] agent.ipc: Failed to stream response to %v: %v", qs.client, err) + return + } + case <-done: + if err := qs.sendDone(); err != nil { + qs.logger.Printf("[ERR] agent.ipc: Failed to stream query end to %v: %v", qs.client, err) + } + return + } + } +} + +// sendAck is used to send a single ack +func (qs *queryResponseStream) sendAck(from string) error { + header := responseHeader{ + Seq: qs.seq, + Error: "", + } + rec := queryRecord{ + Type: queryRecordAck, + From: from, + } + return qs.client.Send(&header, &rec) +} + +// sendResponse is used to send a single response +func (qs *queryResponseStream) sendResponse(from string, payload []byte) error { + header := responseHeader{ + Seq: qs.seq, + Error: "", + } + rec := queryRecord{ + Type: queryRecordResponse, + From: from, + Payload: payload, + } + return qs.client.Send(&header, &rec) +} + +// sendDone is used to signal the end +func (qs *queryResponseStream) sendDone() error { + header := responseHeader{ + Seq: qs.seq, + Error: "", + } + rec := queryRecord{ + Type: queryRecordDone, + } + return qs.client.Send(&header, &rec) +} diff --git a/command/agent/rpc.go b/command/agent/rpc.go index dd2d376c60e9..d4101f0516f1 100644 --- a/command/agent/rpc.go +++ b/command/agent/rpc.go @@ -57,6 +57,7 @@ const ( useKeyCommand = "use-key" removeKeyCommand = "remove-key" listKeysCommand = "list-keys" + queryCommand = "query" ) const ( @@ -410,6 +411,9 @@ func (i *AgentRPC) handleRequest(client *rpcClient, reqHeader *requestHeader) er case installKeyCommand, useKeyCommand, removeKeyCommand, listKeysCommand: return i.handleKeyring(client, seq, command, token) + case queryCommand: + return i.handleQuery(client, seq) + default: respHeader := responseHeader{Seq: seq, Error: unsupportedCommand} client.Send(&respHeader, nil) @@ -578,6 +582,41 @@ func (i *AgentRPC) handleStop(client *rpcClient, seq uint64) error { return client.Send(&resp, nil) } +func (i *AgentRPC) handleQuery(client *rpcClient, seq uint64) error { + i.logger.Printf("[DEBUG] jfs: consul/command/agent/rpc.go handleQuery() invoked...") + var req queryRequest + if err := client.dec.Decode(&req); err != nil { + return fmt.Errorf("decode failed: %v", err) + } + + // Setup the query + params := serf.QueryParam{ + FilterNodes: req.FilterNodes, + FilterTags: req.FilterTags, + RequestAck: req.RequestAck, + Timeout: req.Timeout, + } + + // Start the query + queryResp, err := i.agent.Query(req.Name, req.Payload, ¶ms) + + // Stream the query responses + if err == nil { + qs := newQueryResponseStream(client, seq, i.logger) + defer func() { + go qs.Stream(queryResp) + }() + } + + // Respond + resp := responseHeader{ + Seq: seq, + Error: errToString(err), + } + + return client.Send(&resp, nil) +} + func (i *AgentRPC) handleLeave(client *rpcClient, seq uint64) error { i.logger.Printf("[INFO] agent.rpc: Graceful leave triggered") diff --git a/command/agent/rpc_client.go b/command/agent/rpc_client.go index 3ce90b1634c3..593a0322bad8 100644 --- a/command/agent/rpc_client.go +++ b/command/agent/rpc_client.go @@ -11,6 +11,7 @@ import ( "strings" "sync" "sync/atomic" + "time" ) var ( @@ -345,6 +346,161 @@ func (c *RPCClient) Monitor(level logutils.LogLevel, ch chan<- string) (StreamHa } } +// NodeResponse is used to return the response of a query +type NodeResponse struct { + From string + Payload []byte +} + +type queryHandler struct { + client *RPCClient + closed bool + init bool + initCh chan<- error + ackCh chan<- string + respCh chan<- NodeResponse + seq uint64 +} + +func (qh *queryHandler) Handle(resp *responseHeader) { + log.Printf("[DEBUG] jfs: consul/command/agent/rpc_client.go Handle() invoked...") + // Initialize on the first response + if !qh.init { + qh.init = true + qh.initCh <- strToError(resp.Error) + return + } + + // Decode the query response + var rec queryRecord + if err := qh.client.dec.Decode(&rec); err != nil { + log.Printf("[ERR] Failed to decode query response: %v", err) + qh.client.deregisterHandler(qh.seq) + return + } + + switch rec.Type { + case queryRecordAck: + select { + case qh.ackCh <- rec.From: + default: + log.Printf("[ERR] Dropping query ack, channel full") + } + + case queryRecordResponse: + select { + case qh.respCh <- NodeResponse{rec.From, rec.Payload}: + default: + log.Printf("[ERR] Dropping query response, channel full") + } + + case queryRecordDone: + // No further records coming + qh.client.deregisterHandler(qh.seq) + + default: + log.Printf("[ERR] Unrecognized query record type: %s", rec.Type) + } +} + +func (qh *queryHandler) Cleanup() { + log.Printf("[DEBUG] jfs: consul/command/agent/rpc_client.go Cleanup() invoked...") + if !qh.closed { + if !qh.init { + qh.init = true + qh.initCh <- fmt.Errorf("Stream closed") + } + if qh.ackCh != nil { + close(qh.ackCh) + } + if qh.respCh != nil { + close(qh.respCh) + } + qh.closed = true + } +} + +const ( + queryRecordAck = "ack" + queryRecordResponse = "response" + queryRecordDone = "done" +) + +type queryRecord struct { + Type string + From string + Payload []byte +} + +type queryRequest struct { + FilterNodes []string + FilterTags map[string]string + RequestAck bool + Timeout time.Duration + Name string + Payload []byte +} + +// QueryParam is provided to query set various settings. +type QueryParam struct { + FilterNodes []string // A list of node names to restrict query to + FilterTags map[string]string // A map of tag name to regex to filter on + RequestAck bool // Should nodes ack the query receipt + Timeout time.Duration // Maximum query duration. Optional, will be set automatically. + Name string // Opaque query name + Payload []byte // Opaque query payload + AckCh chan<- string // Channel to send Ack replies on + RespCh chan<- NodeResponse // Channel to send responses on +} + +// Query initiates a new query message using the given parameters, and streams +// acks and responses over the given channels. The channels will not block on +// sends and should be buffered. At the end of the query, the channels will be +// closed. +func (c *RPCClient) Query(params *QueryParam) error { + log.Printf("[DEBUG] jfs: consul/command/agent/rpc_client.go Query() invoked...") + // Setup the request + seq := c.getSeq() + header := requestHeader{ + Command: queryCommand, + Seq: seq, + } + req := queryRequest{ + FilterNodes: params.FilterNodes, + FilterTags: params.FilterTags, + RequestAck: params.RequestAck, + Timeout: params.Timeout, + Name: params.Name, + Payload: params.Payload, + } + + // Create a query handler + initCh := make(chan error, 1) + handler := &queryHandler{ + client: c, + initCh: initCh, + ackCh: params.AckCh, + respCh: params.RespCh, + seq: seq, + } + c.handleSeq(seq, handler) + + // Send the request + if err := c.send(&header, &req); err != nil { + c.deregisterHandler(seq) + return err + } + + // Wait for a response + select { + case err := <-initCh: + return err + case <-c.shutdownCh: + c.deregisterHandler(seq) + return clientClosed + } +} + // Stop is used to unsubscribe from logs or event streams func (c *RPCClient) Stop(handle StreamHandle) error { // Deregister locally first to stop delivery diff --git a/command/reachability.go b/command/reachability.go new file mode 100644 index 000000000000..ea71d93a0391 --- /dev/null +++ b/command/reachability.go @@ -0,0 +1,175 @@ +package command + +import ( + "flag" + "fmt" + "github.com/hashicorp/consul/command/agent" + "github.com/hashicorp/serf/serf" + "github.com/mitchellh/cli" + "strings" + "time" +) + +const ( + tooManyAcks = `This could mean Serf is detecting false-failures due to a misconfiguration or network issue.` + tooFewAcks = `This could mean Serf gossip packets are being lost due to a misconfiguration or network issue.` + duplicateResponses = `Duplicate responses means there is a misconfiguration. Verify that node names are unique.` + troubleshooting = ` +Troubleshooting tips: +* Ensure that the bind addr:port is accessible by all other nodes +* If an advertise address is set, ensure it routes to the bind address +* Check that no nodes are behind a NAT +* If nodes are behind firewalls or iptables, check that Serf traffic is permitted (UDP and TCP) +* Verify networking equipment is functional` +) + +// ReachabilityCommand is a Command implementation that is used to trigger +// a new reachability test +type ReachabilityCommand struct { + ShutdownCh <-chan struct{} + Ui cli.Ui +} + +func (c *ReachabilityCommand) Help() string { + helpText := ` +Usage: serf reachability [options] + + Tests the network reachability of this node + +Options: + + -rpc-addr=127.0.0.1:8400 RPC address of the Serf agent. + -rpc-auth="" RPC auth token of the Serf agent. + -verbose Verbose mode +` + return strings.TrimSpace(helpText) +} + +func (c *ReachabilityCommand) Run(args []string) int { + var verbose bool + cmdFlags := flag.NewFlagSet("reachability", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } + cmdFlags.BoolVar(&verbose, "verbose", false, "verbose mode") + rpcAddr := RPCAddrFlag(cmdFlags) + if err := cmdFlags.Parse(args); err != nil { + return 1 + } + + cl, err := RPCClient(*rpcAddr) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error connecting to Serf agent: %s", err)) + return 1 + } + defer cl.Close() + + ackCh := make(chan string, 128) + + // Get the list of LAN members + var members []agent.Member + members, err = cl.LANMembers() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error getting members: %s", err)) + return 1 + } + if members == nil { + c.Ui.Error(fmt.Sprintf("Well, crap.")) + return 1 + } + + // Get only the live members + liveMembers := make(map[string]struct{}) + for _, m := range members { + if m.Status == "alive" { + liveMembers[m.Name] = struct{}{} + } + } + c.Ui.Output(fmt.Sprintf("Total members: %d, live members: %d", len(members), len(liveMembers))) + + // Start the query + params := agent.QueryParam{ + RequestAck: true, + Name: serf.InternalQueryPrefix + "ping", + AckCh: ackCh, + } + if err := cl.Query(¶ms); err != nil { + c.Ui.Error(fmt.Sprintf("Error sending query: %s", err)) + return 1 + } + c.Ui.Output("Starting reachability test...") + start := time.Now() + last := time.Now() + + // Track responses and acknowledgements + exit := 0 + dups := false + numAcks := 0 + acksFrom := make(map[string]struct{}, len(members)) + +OUTER: + for { + select { + case a := <-ackCh: + if a == "" { + break OUTER + } + if verbose { + c.Ui.Output(fmt.Sprintf("\tAck from '%s'", a)) + } + numAcks++ + if _, ok := acksFrom[a]; ok { + dups = true + c.Ui.Output(fmt.Sprintf("Duplicate response from '%v'", a)) + } + acksFrom[a] = struct{}{} + last = time.Now() + + case <-c.ShutdownCh: + c.Ui.Error("Test interrupted") + return 1 + } + } + + if verbose { + total := float64(time.Now().Sub(start)) / float64(time.Second) + timeToLast := float64(last.Sub(start)) / float64(time.Second) + c.Ui.Output(fmt.Sprintf("Query time: %0.2f sec, time to last response: %0.2f sec", total, timeToLast)) + } + + // Print troubleshooting info for duplicate responses + if dups { + c.Ui.Output(duplicateResponses) + exit = 1 + } + + n := len(liveMembers) + if numAcks == n { + c.Ui.Output("Successfully contacted all live nodes") + + } else if numAcks > n { + c.Ui.Output("Received more acks than live nodes! Acks from non-live nodes:") + for m := range acksFrom { + if _, ok := liveMembers[m]; !ok { + c.Ui.Output(fmt.Sprintf("\t%s", m)) + } + } + c.Ui.Output(tooManyAcks) + c.Ui.Output(troubleshooting) + return 1 + + } else if numAcks < n { + c.Ui.Output("Received less acks than live nodes! Missing acks from:") + for m := range liveMembers { + if _, ok := acksFrom[m]; !ok { + c.Ui.Output(fmt.Sprintf("\t%s", m)) + } + } + c.Ui.Output(tooFewAcks) + c.Ui.Output(troubleshooting) + return 1 + } + return exit +} + +func (c *ReachabilityCommand) Synopsis() string { + return "Test network reachability" +} diff --git a/commands.go b/commands.go index 57c640ab5a7f..5f501d6e822c 100644 --- a/commands.go +++ b/commands.go @@ -108,6 +108,12 @@ func init() { }, nil }, + "reachability": func() (cli.Command, error) { + return &command.ReachabilityCommand{ + Ui: ui, + }, nil + }, + "reload": func() (cli.Command, error) { return &command.ReloadCommand{ Ui: ui, diff --git a/consul/client.go b/consul/client.go index b3fdb080e880..93659d9ab609 100644 --- a/consul/client.go +++ b/consul/client.go @@ -384,3 +384,20 @@ func (c *Client) Stats() map[string]map[string]string { func (c *Client) GetCoordinate() (*coordinate.Coordinate, error) { return c.serf.GetCoordinate() } + +func (c *Client) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { + // Prevent the use of the internal prefix + if strings.HasPrefix(name, serf.InternalQueryPrefix) { + // Allow the special "ping" query + if name != serf.InternalQueryPrefix+"ping" || payload != nil { + return nil, fmt.Errorf("Queries cannot contain the '%s' prefix", serf.InternalQueryPrefix) + } + } + c.logger.Printf("[DEBUG] client: Requesting query send: %s. Payload: %#v", + name, string(payload)) + resp, err := c.serf.Query(name, payload, params) + if err != nil { + c.logger.Printf("[WARN] client: failed to start user query: %v", err) + } + return resp, err +} From a1bb3af25c8041beee3fccb3930efb451cd8b55a Mon Sep 17 00:00:00 2001 From: John Sullivan Date: Thu, 8 Oct 2015 10:51:06 -0400 Subject: [PATCH 02/13] Remove debugging comments to reduce logging verbosity. --- command/agent/agent.go | 1 - command/agent/rpc.go | 1 - command/agent/rpc_client.go | 3 --- 3 files changed, 5 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index f50e4eab4bd8..f11ab93f906f 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1459,7 +1459,6 @@ func (a *Agent) DisableNodeMaintenance() { // Query sends a Query on Serf, see Serf.Query. func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { - a.logger.Printf("[DEBUG] jfs: consul/command/agent/agent.go Query() invoked...") a.logger.Printf("[DEBUG] agent: Requesting query send: %s. Payload: %#v", name, string(payload)) resp, err := a.client.Query(name, payload, params) diff --git a/command/agent/rpc.go b/command/agent/rpc.go index d4101f0516f1..24be0f968f1d 100644 --- a/command/agent/rpc.go +++ b/command/agent/rpc.go @@ -583,7 +583,6 @@ func (i *AgentRPC) handleStop(client *rpcClient, seq uint64) error { } func (i *AgentRPC) handleQuery(client *rpcClient, seq uint64) error { - i.logger.Printf("[DEBUG] jfs: consul/command/agent/rpc.go handleQuery() invoked...") var req queryRequest if err := client.dec.Decode(&req); err != nil { return fmt.Errorf("decode failed: %v", err) diff --git a/command/agent/rpc_client.go b/command/agent/rpc_client.go index 593a0322bad8..f3c27d115049 100644 --- a/command/agent/rpc_client.go +++ b/command/agent/rpc_client.go @@ -363,7 +363,6 @@ type queryHandler struct { } func (qh *queryHandler) Handle(resp *responseHeader) { - log.Printf("[DEBUG] jfs: consul/command/agent/rpc_client.go Handle() invoked...") // Initialize on the first response if !qh.init { qh.init = true @@ -404,7 +403,6 @@ func (qh *queryHandler) Handle(resp *responseHeader) { } func (qh *queryHandler) Cleanup() { - log.Printf("[DEBUG] jfs: consul/command/agent/rpc_client.go Cleanup() invoked...") if !qh.closed { if !qh.init { qh.init = true @@ -458,7 +456,6 @@ type QueryParam struct { // sends and should be buffered. At the end of the query, the channels will be // closed. func (c *RPCClient) Query(params *QueryParam) error { - log.Printf("[DEBUG] jfs: consul/command/agent/rpc_client.go Query() invoked...") // Setup the request seq := c.getSeq() header := requestHeader{ From d49563793cd2c7be011e21184c70ebcd3feef197 Mon Sep 17 00:00:00 2001 From: John Sullivan Date: Thu, 8 Oct 2015 12:35:53 -0400 Subject: [PATCH 03/13] Fix awkward wording and grammar in QueryParam comment. --- command/agent/rpc_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/command/agent/rpc_client.go b/command/agent/rpc_client.go index f3c27d115049..359089233dea 100644 --- a/command/agent/rpc_client.go +++ b/command/agent/rpc_client.go @@ -439,7 +439,7 @@ type queryRequest struct { Payload []byte } -// QueryParam is provided to query set various settings. +// QueryParam is provided to customize various query settings. type QueryParam struct { FilterNodes []string // A list of node names to restrict query to FilterTags map[string]string // A map of tag name to regex to filter on From 2c055d09a043ea735218a1a4e0ca0ddaa6e4292d Mon Sep 17 00:00:00 2001 From: John Sullivan Date: Thu, 8 Oct 2015 13:31:34 -0400 Subject: [PATCH 04/13] Rename 'Query' to 'SerfQuery' and make sure associated data structures, functions, and log messages are appropriately named. --- command/agent/agent.go | 10 ++-- ...m.go => ipc_serf_query_response_stream.go} | 38 ++++++------- command/agent/rpc.go | 22 ++++---- command/agent/rpc_client.go | 55 ++++++++++--------- command/reachability.go | 20 +++---- consul/client.go | 8 +-- 6 files changed, 75 insertions(+), 78 deletions(-) rename command/agent/{ipc_query_response_stream.go => ipc_serf_query_response_stream.go} (51%) diff --git a/command/agent/agent.go b/command/agent/agent.go index f11ab93f906f..3136f6050c5f 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1457,13 +1457,13 @@ func (a *Agent) DisableNodeMaintenance() { a.logger.Printf("[INFO] agent: Node left maintenance mode") } -// Query sends a Query on Serf, see Serf.Query. -func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { - a.logger.Printf("[DEBUG] agent: Requesting query send: %s. Payload: %#v", +// SerfQuery sends a Query on Serf, see Serf.Query. +func (a *Agent) SerfQuery(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { + a.logger.Printf("[DEBUG] agent: Requesting serf query send: %s. Payload: %#v", name, string(payload)) - resp, err := a.client.Query(name, payload, params) + resp, err := a.client.SerfQuery(name, payload, params) if err != nil { - a.logger.Printf("[WARN] agent: failed to start user query: %v", err) + a.logger.Printf("[WARN] agent: failed to start user serf query: %v", err) } return resp, err } diff --git a/command/agent/ipc_query_response_stream.go b/command/agent/ipc_serf_query_response_stream.go similarity index 51% rename from command/agent/ipc_query_response_stream.go rename to command/agent/ipc_serf_query_response_stream.go index c557831d27dc..b2b86602df58 100644 --- a/command/agent/ipc_query_response_stream.go +++ b/command/agent/ipc_serf_query_response_stream.go @@ -6,15 +6,15 @@ import ( "time" ) -// queryResponseStream is used to stream the query results back to a client -type queryResponseStream struct { +// serfQueryResponseStream is used to stream the serf query results back to a client +type serfQueryResponseStream struct { client streamClient logger *log.Logger seq uint64 } -func newQueryResponseStream(client streamClient, seq uint64, logger *log.Logger) *queryResponseStream { - qs := &queryResponseStream{ +func newSerfQueryResponseStream(client streamClient, seq uint64, logger *log.Logger) *serfQueryResponseStream { + qs := &serfQueryResponseStream{ client: client, logger: logger, seq: seq, @@ -22,9 +22,9 @@ func newQueryResponseStream(client streamClient, seq uint64, logger *log.Logger) return qs } -// Stream is a long running routine used to stream the results of a query back to a client -func (qs *queryResponseStream) Stream(resp *serf.QueryResponse) { - // Setup a timer for the query ending +// Stream is a long running routine used to stream the results of a serf query back to a client +func (qs *serfQueryResponseStream) Stream(resp *serf.QueryResponse) { + // Setup a timer for the serf query ending remaining := resp.Deadline().Sub(time.Now()) done := time.After(remaining) @@ -34,17 +34,17 @@ func (qs *queryResponseStream) Stream(resp *serf.QueryResponse) { select { case a := <-ackCh: if err := qs.sendAck(a); err != nil { - qs.logger.Printf("[ERR] agent.ipc: Failed to stream ack to %v: %v", qs.client, err) + qs.logger.Printf("[ERR] agent.ipc: Failed to stream serf query ack to %v: %v", qs.client, err) return } case r := <-respCh: if err := qs.sendResponse(r.From, r.Payload); err != nil { - qs.logger.Printf("[ERR] agent.ipc: Failed to stream response to %v: %v", qs.client, err) + qs.logger.Printf("[ERR] agent.ipc: Failed to stream serf query response to %v: %v", qs.client, err) return } case <-done: if err := qs.sendDone(); err != nil { - qs.logger.Printf("[ERR] agent.ipc: Failed to stream query end to %v: %v", qs.client, err) + qs.logger.Printf("[ERR] agent.ipc: Failed to stream serf query end to %v: %v", qs.client, err) } return } @@ -52,26 +52,26 @@ func (qs *queryResponseStream) Stream(resp *serf.QueryResponse) { } // sendAck is used to send a single ack -func (qs *queryResponseStream) sendAck(from string) error { +func (qs *serfQueryResponseStream) sendAck(from string) error { header := responseHeader{ Seq: qs.seq, Error: "", } - rec := queryRecord{ - Type: queryRecordAck, + rec := serfQueryRecord{ + Type: serfQueryRecordAck, From: from, } return qs.client.Send(&header, &rec) } // sendResponse is used to send a single response -func (qs *queryResponseStream) sendResponse(from string, payload []byte) error { +func (qs *serfQueryResponseStream) sendResponse(from string, payload []byte) error { header := responseHeader{ Seq: qs.seq, Error: "", } - rec := queryRecord{ - Type: queryRecordResponse, + rec := serfQueryRecord{ + Type: serfQueryRecordResponse, From: from, Payload: payload, } @@ -79,13 +79,13 @@ func (qs *queryResponseStream) sendResponse(from string, payload []byte) error { } // sendDone is used to signal the end -func (qs *queryResponseStream) sendDone() error { +func (qs *serfQueryResponseStream) sendDone() error { header := responseHeader{ Seq: qs.seq, Error: "", } - rec := queryRecord{ - Type: queryRecordDone, + rec := serfQueryRecord{ + Type: serfQueryRecordDone, } return qs.client.Send(&header, &rec) } diff --git a/command/agent/rpc.go b/command/agent/rpc.go index 24be0f968f1d..612f65022647 100644 --- a/command/agent/rpc.go +++ b/command/agent/rpc.go @@ -57,7 +57,7 @@ const ( useKeyCommand = "use-key" removeKeyCommand = "remove-key" listKeysCommand = "list-keys" - queryCommand = "query" + serfQueryCommand = "serf-query" ) const ( @@ -411,8 +411,8 @@ func (i *AgentRPC) handleRequest(client *rpcClient, reqHeader *requestHeader) er case installKeyCommand, useKeyCommand, removeKeyCommand, listKeysCommand: return i.handleKeyring(client, seq, command, token) - case queryCommand: - return i.handleQuery(client, seq) + case serfQueryCommand: + return i.handleSerfQuery(client, seq) default: respHeader := responseHeader{Seq: seq, Error: unsupportedCommand} @@ -582,13 +582,13 @@ func (i *AgentRPC) handleStop(client *rpcClient, seq uint64) error { return client.Send(&resp, nil) } -func (i *AgentRPC) handleQuery(client *rpcClient, seq uint64) error { - var req queryRequest +func (i *AgentRPC) handleSerfQuery(client *rpcClient, seq uint64) error { + var req serfQueryRequest if err := client.dec.Decode(&req); err != nil { return fmt.Errorf("decode failed: %v", err) } - // Setup the query + // Setup the serf query params := serf.QueryParam{ FilterNodes: req.FilterNodes, FilterTags: req.FilterTags, @@ -596,14 +596,14 @@ func (i *AgentRPC) handleQuery(client *rpcClient, seq uint64) error { Timeout: req.Timeout, } - // Start the query - queryResp, err := i.agent.Query(req.Name, req.Payload, ¶ms) + // Start the serf query + serfQueryResp, err := i.agent.SerfQuery(req.Name, req.Payload, ¶ms) - // Stream the query responses + // Stream the serf query responses if err == nil { - qs := newQueryResponseStream(client, seq, i.logger) + qs := newSerfQueryResponseStream(client, seq, i.logger) defer func() { - go qs.Stream(queryResp) + go qs.Stream(serfQueryResp) }() } diff --git a/command/agent/rpc_client.go b/command/agent/rpc_client.go index 359089233dea..ac85a2df4f60 100644 --- a/command/agent/rpc_client.go +++ b/command/agent/rpc_client.go @@ -346,13 +346,13 @@ func (c *RPCClient) Monitor(level logutils.LogLevel, ch chan<- string) (StreamHa } } -// NodeResponse is used to return the response of a query +// NodeResponse is used to return the response of a serf query type NodeResponse struct { From string Payload []byte } -type queryHandler struct { +type serfQueryHandler struct { client *RPCClient closed bool init bool @@ -362,7 +362,7 @@ type queryHandler struct { seq uint64 } -func (qh *queryHandler) Handle(resp *responseHeader) { +func (qh *serfQueryHandler) Handle(resp *responseHeader) { // Initialize on the first response if !qh.init { qh.init = true @@ -371,38 +371,38 @@ func (qh *queryHandler) Handle(resp *responseHeader) { } // Decode the query response - var rec queryRecord + var rec serfQueryRecord if err := qh.client.dec.Decode(&rec); err != nil { - log.Printf("[ERR] Failed to decode query response: %v", err) + log.Printf("[ERR] Failed to decode serf query response: %v", err) qh.client.deregisterHandler(qh.seq) return } switch rec.Type { - case queryRecordAck: + case serfQueryRecordAck: select { case qh.ackCh <- rec.From: default: - log.Printf("[ERR] Dropping query ack, channel full") + log.Printf("[ERR] Dropping serf query ack, channel full") } - case queryRecordResponse: + case serfQueryRecordResponse: select { case qh.respCh <- NodeResponse{rec.From, rec.Payload}: default: - log.Printf("[ERR] Dropping query response, channel full") + log.Printf("[ERR] Dropping serf query response, channel full") } - case queryRecordDone: + case serfQueryRecordDone: // No further records coming qh.client.deregisterHandler(qh.seq) default: - log.Printf("[ERR] Unrecognized query record type: %s", rec.Type) + log.Printf("[ERR] Unrecognized serf query record type: %s", rec.Type) } } -func (qh *queryHandler) Cleanup() { +func (qh *serfQueryHandler) Cleanup() { if !qh.closed { if !qh.init { qh.init = true @@ -419,18 +419,18 @@ func (qh *queryHandler) Cleanup() { } const ( - queryRecordAck = "ack" - queryRecordResponse = "response" - queryRecordDone = "done" + serfQueryRecordAck = "ack" + serfQueryRecordResponse = "response" + serfQueryRecordDone = "done" ) -type queryRecord struct { +type serfQueryRecord struct { Type string From string Payload []byte } -type queryRequest struct { +type serfQueryRequest struct { FilterNodes []string FilterTags map[string]string RequestAck bool @@ -439,8 +439,9 @@ type queryRequest struct { Payload []byte } -// QueryParam is provided to customize various query settings. -type QueryParam struct { +// SerfQueryParam is provided to customize various serf query +// settings. +type SerfQueryParam struct { FilterNodes []string // A list of node names to restrict query to FilterTags map[string]string // A map of tag name to regex to filter on RequestAck bool // Should nodes ack the query receipt @@ -451,18 +452,18 @@ type QueryParam struct { RespCh chan<- NodeResponse // Channel to send responses on } -// Query initiates a new query message using the given parameters, and streams -// acks and responses over the given channels. The channels will not block on -// sends and should be buffered. At the end of the query, the channels will be -// closed. -func (c *RPCClient) Query(params *QueryParam) error { +// SerfQuery initiates a new query message using the given parameters, +// and streams acks and responses over the given channels. The +// channels will not block on sends and should be buffered. At the end +// of the query, the channels will be closed. +func (c *RPCClient) SerfQuery(params *SerfQueryParam) error { // Setup the request seq := c.getSeq() header := requestHeader{ - Command: queryCommand, + Command: serfQueryCommand, Seq: seq, } - req := queryRequest{ + req := serfQueryRequest{ FilterNodes: params.FilterNodes, FilterTags: params.FilterTags, RequestAck: params.RequestAck, @@ -473,7 +474,7 @@ func (c *RPCClient) Query(params *QueryParam) error { // Create a query handler initCh := make(chan error, 1) - handler := &queryHandler{ + handler := &serfQueryHandler{ client: c, initCh: initCh, ackCh: params.AckCh, diff --git a/command/reachability.go b/command/reachability.go index ea71d93a0391..070be637fd45 100644 --- a/command/reachability.go +++ b/command/reachability.go @@ -32,14 +32,14 @@ type ReachabilityCommand struct { func (c *ReachabilityCommand) Help() string { helpText := ` -Usage: serf reachability [options] +Usage: consul reachability [options] Tests the network reachability of this node Options: - -rpc-addr=127.0.0.1:8400 RPC address of the Serf agent. - -rpc-auth="" RPC auth token of the Serf agent. + -rpc-addr=127.0.0.1:8400 RPC address of the Consul agent. + -rpc-auth="" RPC auth token of the Consul agent. -verbose Verbose mode ` return strings.TrimSpace(helpText) @@ -57,7 +57,7 @@ func (c *ReachabilityCommand) Run(args []string) int { cl, err := RPCClient(*rpcAddr) if err != nil { - c.Ui.Error(fmt.Sprintf("Error connecting to Serf agent: %s", err)) + c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) return 1 } defer cl.Close() @@ -71,10 +71,6 @@ func (c *ReachabilityCommand) Run(args []string) int { c.Ui.Error(fmt.Sprintf("Error getting members: %s", err)) return 1 } - if members == nil { - c.Ui.Error(fmt.Sprintf("Well, crap.")) - return 1 - } // Get only the live members liveMembers := make(map[string]struct{}) @@ -86,13 +82,13 @@ func (c *ReachabilityCommand) Run(args []string) int { c.Ui.Output(fmt.Sprintf("Total members: %d, live members: %d", len(members), len(liveMembers))) // Start the query - params := agent.QueryParam{ + params := agent.SerfQueryParam{ RequestAck: true, Name: serf.InternalQueryPrefix + "ping", AckCh: ackCh, } - if err := cl.Query(¶ms); err != nil { - c.Ui.Error(fmt.Sprintf("Error sending query: %s", err)) + if err := cl.SerfQuery(¶ms); err != nil { + c.Ui.Error(fmt.Sprintf("Error sending serf query: %s", err)) return 1 } c.Ui.Output("Starting reachability test...") @@ -157,7 +153,7 @@ OUTER: return 1 } else if numAcks < n { - c.Ui.Output("Received less acks than live nodes! Missing acks from:") + c.Ui.Output("Received fewer acks than live nodes! Missing acks from:") for m := range liveMembers { if _, ok := acksFrom[m]; !ok { c.Ui.Output(fmt.Sprintf("\t%s", m)) diff --git a/consul/client.go b/consul/client.go index 93659d9ab609..4546e086c414 100644 --- a/consul/client.go +++ b/consul/client.go @@ -385,19 +385,19 @@ func (c *Client) GetCoordinate() (*coordinate.Coordinate, error) { return c.serf.GetCoordinate() } -func (c *Client) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { +func (c *Client) SerfQuery(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { // Prevent the use of the internal prefix if strings.HasPrefix(name, serf.InternalQueryPrefix) { // Allow the special "ping" query if name != serf.InternalQueryPrefix+"ping" || payload != nil { - return nil, fmt.Errorf("Queries cannot contain the '%s' prefix", serf.InternalQueryPrefix) + return nil, fmt.Errorf("Serf Queries cannot contain the '%s' prefix", serf.InternalQueryPrefix) } } - c.logger.Printf("[DEBUG] client: Requesting query send: %s. Payload: %#v", + c.logger.Printf("[DEBUG] client: Requesting serf query send: %s. Payload: %#v", name, string(payload)) resp, err := c.serf.Query(name, payload, params) if err != nil { - c.logger.Printf("[WARN] client: failed to start user query: %v", err) + c.logger.Printf("[WARN] client: failed to start user serf query: %v", err) } return resp, err } From 2cb16439cc6dd838ba60fd4f4ba72885f62af1c8 Mon Sep 17 00:00:00 2001 From: John Sullivan Date: Tue, 13 Oct 2015 14:48:06 -0400 Subject: [PATCH 05/13] Add documentation for reachability commands - largely copied from Serf sources. --- .../source/docs/commands/index.html.markdown | 1 + .../docs/commands/reachability.html.markdown | 47 +++++++++++++++++++ website/source/layouts/docs.erb | 4 ++ 3 files changed, 52 insertions(+) create mode 100644 website/source/docs/commands/reachability.html.markdown diff --git a/website/source/docs/commands/index.html.markdown b/website/source/docs/commands/index.html.markdown index e466a94b4460..1f414d42adbc 100644 --- a/website/source/docs/commands/index.html.markdown +++ b/website/source/docs/commands/index.html.markdown @@ -38,6 +38,7 @@ Available commands are: lock Execute a command holding a lock members Lists the members of a Consul cluster monitor Stream logs from a Consul agent + reachability Test network reachability reload Triggers the agent to reload configuration files rtt Estimates network round trip time between nodes version Prints the Consul version diff --git a/website/source/docs/commands/reachability.html.markdown b/website/source/docs/commands/reachability.html.markdown new file mode 100644 index 000000000000..e63a13ae8e8f --- /dev/null +++ b/website/source/docs/commands/reachability.html.markdown @@ -0,0 +1,47 @@ +--- +layout: "docs" +page_title: "Commands: Reachability" +sidebar_current: "docs-commands-reachability" +description: |- + The `reachability` command performs a basic network reachability test. The local node will gossip out a Serf ping message and request that all other nodes acknowledge delivery of the message. +--- + +# Consul Reachability + +Command: `consul reachability` + +The `reachability` command performs a basic network reachability test. +The local node will gossip out a Serf "ping" message and request that +all other nodes acknowledge delivery of the message. + +This can be used to troubleshoot configurations or network issues, since +nodes that are detected as having failed may respond, indicating false-failure +detection, or live nodes may fail to respond, indicating networking issues. + +In general, the following troubleshooting tips are recommended: + +* Ensure that the bind addr:port is accessible by all other nodes +* If an advertise address is set, ensure it routes to the bind address +* Check that no nodes are behind a NAT +* If nodes are behind firewalls or iptables, check that Serf traffic is permitted (UDP and TCP) +* Verify networking equipment is functional + +## Usage + +Usage: `consul reachability [options]` + +The following command-line options are available for this command. +Every option is optional: + +* `-rpc-addr` - Address to the RPC server of the agent you want to contact + to send this command. If this isn't specified, the command will contact + "127.0.0.1:8400" which is the default RPC address of a Consul + agent. This option can also be controlled using the `CONSUL_RPC_ADDR` + environment variable. + +* `-rpc-auth` - Optional RPC auth token. If the agent is configured to use + an auth token, then this must be provided or the agent will refuse the + command. This option can also be controlled using the `CONSUL_RPC_AUTH` + environment variable. + +* `-verbose` - Enables verbose output diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index 166b1655526b..02a4dbdd8489 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -122,6 +122,10 @@ info + > + reachability + + > reload From 2fe8ea64babbcce7f99c917d63b4910315059af4 Mon Sep 17 00:00:00 2001 From: John Sullivan Date: Wed, 14 Oct 2015 13:56:19 -0400 Subject: [PATCH 06/13] Remove '-rpc-auth' option from help text and documentation of 'reachability' command. --- command/reachability.go | 1 - website/source/docs/commands/reachability.html.markdown | 5 ----- 2 files changed, 6 deletions(-) diff --git a/command/reachability.go b/command/reachability.go index 070be637fd45..0943c8c195ac 100644 --- a/command/reachability.go +++ b/command/reachability.go @@ -39,7 +39,6 @@ Usage: consul reachability [options] Options: -rpc-addr=127.0.0.1:8400 RPC address of the Consul agent. - -rpc-auth="" RPC auth token of the Consul agent. -verbose Verbose mode ` return strings.TrimSpace(helpText) diff --git a/website/source/docs/commands/reachability.html.markdown b/website/source/docs/commands/reachability.html.markdown index e63a13ae8e8f..41e6f98fc62b 100644 --- a/website/source/docs/commands/reachability.html.markdown +++ b/website/source/docs/commands/reachability.html.markdown @@ -39,9 +39,4 @@ Every option is optional: agent. This option can also be controlled using the `CONSUL_RPC_ADDR` environment variable. -* `-rpc-auth` - Optional RPC auth token. If the agent is configured to use - an auth token, then this must be provided or the agent will refuse the - command. This option can also be controlled using the `CONSUL_RPC_AUTH` - environment variable. - * `-verbose` - Enables verbose output From bd87efe9791a43199e4f83567fbf45711e574669 Mon Sep 17 00:00:00 2001 From: John Sullivan Date: Wed, 14 Oct 2015 14:00:43 -0400 Subject: [PATCH 07/13] Initial port of 'reachability' test from Serf project. Currently fails - 'reachability' does not work on servers, only clients. --- command/reachability_test.go | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 command/reachability_test.go diff --git a/command/reachability_test.go b/command/reachability_test.go new file mode 100644 index 000000000000..77b3c055d030 --- /dev/null +++ b/command/reachability_test.go @@ -0,0 +1,32 @@ +package command + +import ( + "github.com/mitchellh/cli" + "strings" + "testing" +) + +func TestReachabilityCommand_implements(t *testing.T) { + var _ cli.Command = &ReachabilityCommand{} +} + +func TestReachabilityCommand_Run(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + + ui := new(cli.MockUi) + c := &ReachabilityCommand{Ui: ui} + args := []string{"-rpc-addr=" + a1.addr, "-verbose"} + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + + if !strings.Contains(ui.OutputWriter.String(), a1.config.NodeName) { + t.Fatalf("bad: %#v", ui.OutputWriter.String()) + } + if !strings.Contains(ui.OutputWriter.String(), "Successfully") { + t.Fatalf("bad: %#v", ui.OutputWriter.String()) + } +} From 40382eba175893d707154c598ee1561c0fa8a9bb Mon Sep 17 00:00:00 2001 From: John Sullivan Date: Fri, 16 Oct 2015 11:10:25 -0400 Subject: [PATCH 08/13] Initial attempt to refactor SerfQuery so it works for both server and client configurations. --- command/agent/agent.go | 8 +++- command/agent/rpc_client.go | 26 ++---------- command/reachability.go | 3 +- consul/client.go | 27 ++++++------- consul/serfquery.go | 80 +++++++++++++++++++++++++++++++++++++ consul/server.go | 15 +++++++ scripts/test.sh | 6 ++- website/Gemfile | 2 +- 8 files changed, 126 insertions(+), 41 deletions(-) create mode 100644 consul/serfquery.go diff --git a/command/agent/agent.go b/command/agent/agent.go index 3136f6050c5f..b120a0a15394 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1461,7 +1461,13 @@ func (a *Agent) DisableNodeMaintenance() { func (a *Agent) SerfQuery(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { a.logger.Printf("[DEBUG] agent: Requesting serf query send: %s. Payload: %#v", name, string(payload)) - resp, err := a.client.SerfQuery(name, payload, params) + var resp *serf.QueryResponse + var err error + if a.server != nil { + resp, err = a.server.SerfQuery(name, payload, params) + } else { + resp, err = a.client.SerfQuery(name, payload, params) + } if err != nil { a.logger.Printf("[WARN] agent: failed to start user serf query: %v", err) } diff --git a/command/agent/rpc_client.go b/command/agent/rpc_client.go index ac85a2df4f60..21c65ec986ab 100644 --- a/command/agent/rpc_client.go +++ b/command/agent/rpc_client.go @@ -3,6 +3,7 @@ package agent import ( "bufio" "fmt" + "github.com/hashicorp/consul/consul" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/logutils" "log" @@ -346,19 +347,13 @@ func (c *RPCClient) Monitor(level logutils.LogLevel, ch chan<- string) (StreamHa } } -// NodeResponse is used to return the response of a serf query -type NodeResponse struct { - From string - Payload []byte -} - type serfQueryHandler struct { client *RPCClient closed bool init bool initCh chan<- error ackCh chan<- string - respCh chan<- NodeResponse + respCh chan<- consul.NodeResponse seq uint64 } @@ -388,7 +383,7 @@ func (qh *serfQueryHandler) Handle(resp *responseHeader) { case serfQueryRecordResponse: select { - case qh.respCh <- NodeResponse{rec.From, rec.Payload}: + case qh.respCh <- consul.NodeResponse{rec.From, rec.Payload}: default: log.Printf("[ERR] Dropping serf query response, channel full") } @@ -439,24 +434,11 @@ type serfQueryRequest struct { Payload []byte } -// SerfQueryParam is provided to customize various serf query -// settings. -type SerfQueryParam struct { - FilterNodes []string // A list of node names to restrict query to - FilterTags map[string]string // A map of tag name to regex to filter on - RequestAck bool // Should nodes ack the query receipt - Timeout time.Duration // Maximum query duration. Optional, will be set automatically. - Name string // Opaque query name - Payload []byte // Opaque query payload - AckCh chan<- string // Channel to send Ack replies on - RespCh chan<- NodeResponse // Channel to send responses on -} - // SerfQuery initiates a new query message using the given parameters, // and streams acks and responses over the given channels. The // channels will not block on sends and should be buffered. At the end // of the query, the channels will be closed. -func (c *RPCClient) SerfQuery(params *SerfQueryParam) error { +func (c *RPCClient) SerfQuery(params *consul.SerfQueryParam) error { // Setup the request seq := c.getSeq() header := requestHeader{ diff --git a/command/reachability.go b/command/reachability.go index 0943c8c195ac..597b1f6fd112 100644 --- a/command/reachability.go +++ b/command/reachability.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "github.com/hashicorp/consul/command/agent" + "github.com/hashicorp/consul/consul" "github.com/hashicorp/serf/serf" "github.com/mitchellh/cli" "strings" @@ -81,7 +82,7 @@ func (c *ReachabilityCommand) Run(args []string) int { c.Ui.Output(fmt.Sprintf("Total members: %d, live members: %d", len(members), len(liveMembers))) // Start the query - params := agent.SerfQueryParam{ + params := consul.SerfQueryParam{ RequestAck: true, Name: serf.InternalQueryPrefix + "ping", AckCh: ackCh, diff --git a/consul/client.go b/consul/client.go index 4546e086c414..4ea24951cfb5 100644 --- a/consul/client.go +++ b/consul/client.go @@ -63,6 +63,9 @@ type Client struct { // which contains all the DC nodes serf *serf.Serf + // serfQuery is used to perform 'reachability' tests + serfQuery *SerfQuery + shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex @@ -119,6 +122,14 @@ func NewClient(config *Config) (*Client, error) { c.Shutdown() return nil, fmt.Errorf("Failed to start lan serf: %v", err) } + + // Initialize the SerfQuery object + c.serfQuery, err = NewSerfQuery(config, c.serf) + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("Failed to initialize serfQuery: %v", err) + } + return c, nil } @@ -386,18 +397,6 @@ func (c *Client) GetCoordinate() (*coordinate.Coordinate, error) { } func (c *Client) SerfQuery(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { - // Prevent the use of the internal prefix - if strings.HasPrefix(name, serf.InternalQueryPrefix) { - // Allow the special "ping" query - if name != serf.InternalQueryPrefix+"ping" || payload != nil { - return nil, fmt.Errorf("Serf Queries cannot contain the '%s' prefix", serf.InternalQueryPrefix) - } - } - c.logger.Printf("[DEBUG] client: Requesting serf query send: %s. Payload: %#v", - name, string(payload)) - resp, err := c.serf.Query(name, payload, params) - if err != nil { - c.logger.Printf("[WARN] client: failed to start user serf query: %v", err) - } - return resp, err + qr, err := c.serfQuery.Query(name, payload, params) + return qr, err } diff --git a/consul/serfquery.go b/consul/serfquery.go new file mode 100644 index 000000000000..8c14163f4730 --- /dev/null +++ b/consul/serfquery.go @@ -0,0 +1,80 @@ +package consul + +import ( + "fmt" + "github.com/hashicorp/serf/serf" + "log" + "os" + "strings" + "time" +) + +// NodeResponse is used to return the response of a serf query +type NodeResponse struct { + From string + Payload []byte +} + +// SerfQueryParam is provided to customize various serf query +// settings. +type SerfQueryParam struct { + FilterNodes []string // A list of node names to restrict query to + FilterTags map[string]string // A map of tag name to regex to filter on + RequestAck bool // Should nodes ack the query receipt + Timeout time.Duration // Maximum query duration. Optional, will be set automatically. + Name string // Opaque query name + Payload []byte // Opaque query payload + AckCh chan<- string // Channel to send Ack replies on + RespCh chan<- NodeResponse // Channel to send responses on +} + +// SerfQuery handles 'reachability' Serf 'pings' for both the client +// and server. +type SerfQuery struct { + // Logger uses the provided LogOutput + logger *log.Logger + + // serf is the Serf cluster maintained inside the DC + // which contains all the DC nodes + serf *serf.Serf +} + +func NewSerfQuery(config *Config, serf *serf.Serf) (*SerfQuery, error) { + // Check the protocol version + if err := config.CheckVersion(); err != nil { + return nil, err + } + + // Ensure we have a log output + if config.LogOutput == nil { + config.LogOutput = os.Stderr + } + + // Create a logger + logger := log.New(config.LogOutput, "", log.LstdFlags) + + // Create the SerfQuery object. + sq := &SerfQuery{ + logger: logger, + serf: serf, + } + + return sq, nil +} + +func (c *SerfQuery) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { + // Prevent the use of the internal prefix + if strings.HasPrefix(name, serf.InternalQueryPrefix) { + // Allow the special "ping" query + if name != serf.InternalQueryPrefix+"ping" || payload != nil { + return nil, fmt.Errorf("Serf Queries cannot contain the '%s' prefix", serf.InternalQueryPrefix) + } + } + c.logger.Printf("[DEBUG] client: Requesting serf query send: %s. Payload: %#v", + name, string(payload)) + resp, err := c.serf.Query(name, payload, params) + if err != nil { + c.logger.Printf("[WARN] client: failed to start user serf query: %v", err) + } + return resp, err +} diff --git a/consul/server.go b/consul/server.go index d90e163a74ff..bf520b49367b 100644 --- a/consul/server.go +++ b/consul/server.go @@ -136,6 +136,9 @@ type Server struct { // which SHOULD only consist of Consul servers serfWAN *serf.Serf + // lanSerfQuery is used to perform 'reachability' tests on the LAN + lanSerfQuery *SerfQuery + // sessionTimers track the expiration time of each Session that has // a TTL. On expiration, a SessionDestroy event will occur, and // destroy the session via standard session destroy processing @@ -258,6 +261,13 @@ func NewServer(config *Config) (*Server, error) { } go s.lanEventHandler() + // Initialize the SerfQuery object + s.lanSerfQuery, err = NewSerfQuery(config, s.serfLAN) + if err != nil { + s.Shutdown() + return nil, fmt.Errorf("Failed to initialize lanSerfQuery: %v", err) + } + // Initialize the wan Serf s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true) @@ -716,3 +726,8 @@ func (s *Server) GetLANCoordinate() (*coordinate.Coordinate, error) { func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) { return s.serfWAN.GetCoordinate() } + +func (s *Server) SerfQuery(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { + qr, err := s.lanSerfQuery.Query(name, payload, params) + return qr, err +} diff --git a/scripts/test.sh b/scripts/test.sh index 7a751fe67241..42d7b7546130 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -1,13 +1,15 @@ #!/usr/bin/env bash +export PATH=${PATH}:${GOPATH}/src/github.com/mailgun/godebug + # Create a temp dir and clean it up on exit TEMPDIR=`mktemp -d -t consul-test.XXX` trap "rm -rf $TEMPDIR" EXIT HUP INT QUIT TERM # Build the Consul binary for the API tests echo "--> Building consul" -go build -o $TEMPDIR/consul || exit 1 +godebug build -o $TEMPDIR/consul || exit 1 # Run the tests echo "--> Running tests" -go list ./... | PATH=$TEMPDIR:$PATH xargs -n1 go test +go list ./... | PATH=$TEMPDIR:$PATH xargs -n1 godebug test diff --git a/website/Gemfile b/website/Gemfile index 2b35e281068a..d0e5a286b7bd 100644 --- a/website/Gemfile +++ b/website/Gemfile @@ -1,5 +1,5 @@ source "https://rubygems.org" -ruby "2.2.2" +ruby "2.2.3" gem "middleman-hashicorp", github: "hashicorp/middleman-hashicorp" From ad650d27895c8845fc4a3954abdb6a215a2d618a Mon Sep 17 00:00:00 2001 From: John Sullivan Date: Thu, 22 Oct 2015 09:21:16 -0400 Subject: [PATCH 09/13] Fix issue in which 'reachability' does not error when the non-zero number of missing acks from live nodes equals the number of acks from non-live nodes. --- command/reachability.go | 53 +++++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/command/reachability.go b/command/reachability.go index 597b1f6fd112..457a1415518f 100644 --- a/command/reachability.go +++ b/command/reachability.go @@ -12,10 +12,10 @@ import ( ) const ( - tooManyAcks = `This could mean Serf is detecting false-failures due to a misconfiguration or network issue.` - tooFewAcks = `This could mean Serf gossip packets are being lost due to a misconfiguration or network issue.` - duplicateResponses = `Duplicate responses means there is a misconfiguration. Verify that node names are unique.` - troubleshooting = ` + nonLiveNodeAcks = `This could mean Serf is detecting false-failures due to a misconfiguration or network issue.` + liveNodeMissingAcks = `This could mean Serf gossip packets are being lost due to a misconfiguration or network issue.` + duplicateResponses = `Duplicate responses means there is a misconfiguration. Verify that node names are unique.` + troubleshooting = ` Troubleshooting tips: * Ensure that the bind addr:port is accessible by all other nodes * If an advertise address is set, ensure it routes to the bind address @@ -137,31 +137,44 @@ OUTER: exit = 1 } - n := len(liveMembers) - if numAcks == n { + // Ensure all live members responded. + liveNotResponding := make(map[string]bool) + for m := range liveMembers { + if _, ok := acksFrom[m]; !ok { + c.Ui.Output(fmt.Sprintf("Missing ack from: %s", m)) + liveNotResponding[m] = true + } + } + + // Ensure that no responses came from non-live nodes. + nonliveResponding := make(map[string]bool) + for m := range acksFrom { + if _, ok := liveMembers[m]; !ok { + c.Ui.Output(fmt.Sprintf("Received ack from non-live node: %s", m)) + nonliveResponding[m] = true + } + } + + if (len(liveNotResponding) == 0) && (len(nonliveResponding) == 0) { c.Ui.Output("Successfully contacted all live nodes") - } else if numAcks > n { - c.Ui.Output("Received more acks than live nodes! Acks from non-live nodes:") - for m := range acksFrom { - if _, ok := liveMembers[m]; !ok { + } else { + if len(nonliveResponding) != 0 { + c.Ui.Output("Acks from non-live nodes:") + for m := range nonliveResponding { c.Ui.Output(fmt.Sprintf("\t%s", m)) } + c.Ui.Output(nonLiveNodeAcks) } - c.Ui.Output(tooManyAcks) - c.Ui.Output(troubleshooting) - return 1 - - } else if numAcks < n { - c.Ui.Output("Received fewer acks than live nodes! Missing acks from:") - for m := range liveMembers { - if _, ok := acksFrom[m]; !ok { + if len(liveNotResponding) != 0 { + c.Ui.Output("Missing acks from:") + for m := range liveNotResponding { c.Ui.Output(fmt.Sprintf("\t%s", m)) } + c.Ui.Output(liveNodeMissingAcks) } - c.Ui.Output(tooFewAcks) c.Ui.Output(troubleshooting) - return 1 + exit = 1 } return exit } From 54c2ea548027534112e19b09a9d2224e89dbfba2 Mon Sep 17 00:00:00 2001 From: John Sullivan Date: Tue, 3 Nov 2015 15:24:59 -0500 Subject: [PATCH 10/13] Add 'ping' command to perform memberlist pings as a network diagnostic tool. --- command/agent/agent.go | 25 +++++++ command/agent/rpc.go | 29 +++++++ command/agent/rpc_client.go | 23 ++++++ command/ping.go | 108 +++++++++++++++++++++++++++ commands.go | 6 ++ consul/client.go | 17 +++-- consul/{serfquery.go => serfdiag.go} | 36 +++++++-- consul/server.go | 18 +++-- 8 files changed, 242 insertions(+), 20 deletions(-) create mode 100644 command/ping.go rename consul/{serfquery.go => serfdiag.go} (69%) diff --git a/command/agent/agent.go b/command/agent/agent.go index b120a0a15394..bfd6c995b913 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1473,3 +1473,28 @@ func (a *Agent) SerfQuery(name string, payload []byte, params *serf.QueryParam) } return resp, err } + +// SerfPing sends a Memberlist Ping via Serf. +func (a *Agent) SerfPing(name string) (*serfPingResponse, error) { + a.logger.Printf("[DEBUG] agent: Requesting serf ping send: %s", name) + var err error + var cresp *consul.SerfPingResponse + params := consul.SerfPingParam{ + Name: name, + } + if a.server != nil { + cresp, err = a.server.SerfPing(¶ms) + } else { + cresp, err = a.client.SerfPing(¶ms) + } + if err != nil { + a.logger.Printf("[WARN] agent: failed to start user serf ping: %v", err) + return nil, err + } + + resp := serfPingResponse{ + Success: cresp.Success, + RTT: cresp.RTT, + } + return &resp, err +} diff --git a/command/agent/rpc.go b/command/agent/rpc.go index 612f65022647..60b5219cb33d 100644 --- a/command/agent/rpc.go +++ b/command/agent/rpc.go @@ -30,6 +30,7 @@ import ( "os" "strings" "sync" + "time" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-msgpack/codec" @@ -58,6 +59,7 @@ const ( removeKeyCommand = "remove-key" listKeysCommand = "list-keys" serfQueryCommand = "serf-query" + serfPingCommand = "serf-ping" ) const ( @@ -189,6 +191,11 @@ type memberEventRecord struct { Members []Member } +type serfPingResponse struct { + Success bool + RTT time.Duration +} + type AgentRPC struct { sync.Mutex agent *Agent @@ -414,6 +421,9 @@ func (i *AgentRPC) handleRequest(client *rpcClient, reqHeader *requestHeader) er case serfQueryCommand: return i.handleSerfQuery(client, seq) + case serfPingCommand: + return i.handleSerfPing(client, seq) + default: respHeader := responseHeader{Seq: seq, Error: unsupportedCommand} client.Send(&respHeader, nil) @@ -616,6 +626,25 @@ func (i *AgentRPC) handleSerfQuery(client *rpcClient, seq uint64) error { return client.Send(&resp, nil) } +func (i *AgentRPC) handleSerfPing(client *rpcClient, seq uint64) error { + var req serfPingRequest + if err := client.dec.Decode(&req); err != nil { + return fmt.Errorf("decode failed: %v", err) + } + + i.logger.Printf("[DEBUG] agent.rpc: handling serf ping request") + + // Start the serf query + resp, err := i.agent.SerfPing(req.Name) + + header := responseHeader{ + Seq: seq, + Error: errToString(err), + } + + return client.Send(&header, &resp) +} + func (i *AgentRPC) handleLeave(client *rpcClient, seq uint64) error { i.logger.Printf("[INFO] agent.rpc: Graceful leave triggered") diff --git a/command/agent/rpc_client.go b/command/agent/rpc_client.go index 21c65ec986ab..f430978d6c11 100644 --- a/command/agent/rpc_client.go +++ b/command/agent/rpc_client.go @@ -481,6 +481,29 @@ func (c *RPCClient) SerfQuery(params *consul.SerfQueryParam) error { } } +type serfPingRequest struct { + Name string +} + +// SerfPing initiates a new ping message using the given parameters +func (c *RPCClient) SerfPing(params *consul.SerfPingParam) (*serfPingResponse, error) { + // Setup the request + seq := c.getSeq() + header := requestHeader{ + Command: serfPingCommand, + Seq: seq, + } + req := serfPingRequest{ + Name: params.Name, + } + + var resp serfPingResponse + // Send the request + err := c.genericRPC(&header, &req, &resp) + + return &resp, err +} + // Stop is used to unsubscribe from logs or event streams func (c *RPCClient) Stop(handle StreamHandle) error { // Deregister locally first to stop delivery diff --git a/command/ping.go b/command/ping.go new file mode 100644 index 000000000000..b3a8813a7288 --- /dev/null +++ b/command/ping.go @@ -0,0 +1,108 @@ +package command + +import ( + "flag" + "fmt" + "github.com/hashicorp/consul/consul" + "github.com/mitchellh/cli" + "os" + "os/signal" + "strings" + "time" +) + +// PingCommand is a Command implementation that is used to initiate +// a series of Serf 'ping' messages to the specified client node +type PingCommand struct { + ShutdownCh <-chan struct{} + Ui cli.Ui +} + +func (c *PingCommand) Help() string { + helpText := ` +Usage: consul reachability [options] + + Issues a series of Serf 'ping' messages to the specified node + +Options: + + -rpc-addr=127.0.0.1:8400 RPC address of the Consul agent. + -count Number of pings to send (0 is infinite) + -node Name of node to ping +` + return strings.TrimSpace(helpText) +} + +func (c *PingCommand) Run(args []string) int { + var count int + var nodename string + exit := 0 + cmdFlags := flag.NewFlagSet("ping", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } + cmdFlags.IntVar(&count, "count", 0, "count") + cmdFlags.StringVar(&nodename, "node", "*", "name of node to ping") + rpcAddr := RPCAddrFlag(cmdFlags) + if err := cmdFlags.Parse(args); err != nil { + return 1 + } + + cl, err := RPCClient(*rpcAddr) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) + return 1 + } + defer cl.Close() + + success := 0 + latency := 0 * time.Millisecond + var sent int + c.Ui.Output("Starting serf ping...") + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt) + go func() { + <-sigChan + cleanup(sent, success, latency, c) + os.Exit(exit) + }() + for sent = 0; (count == 0) || (sent < count); sent++ { + if sent > 0 { + time.Sleep(time.Second) + } + // Start the query + params := consul.SerfPingParam{ + Name: nodename, + } + if resp, err := cl.SerfPing(¶ms); err != nil { + c.Ui.Error(fmt.Sprintf("Error sending serf ping: %s", err)) + exit = 1 + break + } else if resp.Success { + c.Ui.Output(fmt.Sprintf("Count %d: Node %s responded in %s", sent, nodename, resp.RTT)) + latency = latency + resp.RTT + success++ + } else { + c.Ui.Output(fmt.Sprintf("Count %d: Node %s failed to respond in %s", sent, nodename, resp.RTT)) + } + } + + if exit == 0 { + cleanup(sent, success, latency, c) + } + + return exit +} + +func cleanup(sent int, success int, latency time.Duration, c *PingCommand) { + var avgLatency string + if success > 0 { + avgLatency = fmt.Sprintf("%s", latency/time.Duration(success)) + } else { + avgLatency = "N/A" + } + c.Ui.Output(fmt.Sprintf("Statistics: total %d, success %d%%, avg latency: %s", + sent, (success * 100 / sent), avgLatency)) +} + +func (c *PingCommand) Synopsis() string { + return "Issue Serf 'ping' messages to specified node" +} diff --git a/commands.go b/commands.go index 5f501d6e822c..9961448e47bc 100644 --- a/commands.go +++ b/commands.go @@ -108,6 +108,12 @@ func init() { }, nil }, + "ping": func() (cli.Command, error) { + return &command.PingCommand{ + Ui: ui, + }, nil + }, + "reachability": func() (cli.Command, error) { return &command.ReachabilityCommand{ Ui: ui, diff --git a/consul/client.go b/consul/client.go index 4ea24951cfb5..c6c3d3150c5d 100644 --- a/consul/client.go +++ b/consul/client.go @@ -63,8 +63,8 @@ type Client struct { // which contains all the DC nodes serf *serf.Serf - // serfQuery is used to perform 'reachability' tests - serfQuery *SerfQuery + // serfDiag is used to perform Serf-based diagnostic tests + serfDiag *SerfDiag shutdown bool shutdownCh chan struct{} @@ -123,11 +123,11 @@ func NewClient(config *Config) (*Client, error) { return nil, fmt.Errorf("Failed to start lan serf: %v", err) } - // Initialize the SerfQuery object - c.serfQuery, err = NewSerfQuery(config, c.serf) + // Initialize the SerfDiag object + c.serfDiag, err = NewSerfDiag(config, c.serf) if err != nil { c.Shutdown() - return nil, fmt.Errorf("Failed to initialize serfQuery: %v", err) + return nil, fmt.Errorf("Failed to initialize serfDiag: %v", err) } return c, nil @@ -397,6 +397,11 @@ func (c *Client) GetCoordinate() (*coordinate.Coordinate, error) { } func (c *Client) SerfQuery(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { - qr, err := c.serfQuery.Query(name, payload, params) + qr, err := c.serfDiag.Query(name, payload, params) return qr, err } + +func (c *Client) SerfPing(param *SerfPingParam) (*SerfPingResponse, error) { + resp, err := c.serfDiag.Ping(param) + return resp, err +} diff --git a/consul/serfquery.go b/consul/serfdiag.go similarity index 69% rename from consul/serfquery.go rename to consul/serfdiag.go index 8c14163f4730..836490a2c9ef 100644 --- a/consul/serfquery.go +++ b/consul/serfdiag.go @@ -28,9 +28,9 @@ type SerfQueryParam struct { RespCh chan<- NodeResponse // Channel to send responses on } -// SerfQuery handles 'reachability' Serf 'pings' for both the client -// and server. -type SerfQuery struct { +// SerfDiag handles Serf-based diagnostics for both the Consul server +// and the Consul client +type SerfDiag struct { // Logger uses the provided LogOutput logger *log.Logger @@ -39,7 +39,7 @@ type SerfQuery struct { serf *serf.Serf } -func NewSerfQuery(config *Config, serf *serf.Serf) (*SerfQuery, error) { +func NewSerfDiag(config *Config, serf *serf.Serf) (*SerfDiag, error) { // Check the protocol version if err := config.CheckVersion(); err != nil { return nil, err @@ -53,16 +53,16 @@ func NewSerfQuery(config *Config, serf *serf.Serf) (*SerfQuery, error) { // Create a logger logger := log.New(config.LogOutput, "", log.LstdFlags) - // Create the SerfQuery object. - sq := &SerfQuery{ + // Create the SerfDiag object. + sd := &SerfDiag{ logger: logger, serf: serf, } - return sq, nil + return sd, nil } -func (c *SerfQuery) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { +func (c *SerfDiag) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { // Prevent the use of the internal prefix if strings.HasPrefix(name, serf.InternalQueryPrefix) { // Allow the special "ping" query @@ -78,3 +78,23 @@ func (c *SerfQuery) Query(name string, payload []byte, params *serf.QueryParam) } return resp, err } + +// SerfPingParam is provided to customize various Serf Ping settings. +type SerfPingParam struct { + Name string // Name of node to ping +} + +type SerfPingResponse struct { + Success bool + RTT time.Duration +} + +func (c *SerfDiag) Ping(params *SerfPingParam) (*SerfPingResponse, error) { + c.logger.Printf("[DEBUG] client: Requesting serf ping send: %s", params.Name) + success, rtt, err := c.serf.Memberlist().Ping(params.Name) + resp := SerfPingResponse{ + Success: success, + RTT: rtt, + } + return &resp, err +} diff --git a/consul/server.go b/consul/server.go index bf520b49367b..49dbd898420f 100644 --- a/consul/server.go +++ b/consul/server.go @@ -136,8 +136,9 @@ type Server struct { // which SHOULD only consist of Consul servers serfWAN *serf.Serf - // lanSerfQuery is used to perform 'reachability' tests on the LAN - lanSerfQuery *SerfQuery + // lanSerfDiag is used to perform Serf-based diagnostic tests on + // the LAN + lanSerfDiag *SerfDiag // sessionTimers track the expiration time of each Session that has // a TTL. On expiration, a SessionDestroy event will occur, and @@ -261,11 +262,11 @@ func NewServer(config *Config) (*Server, error) { } go s.lanEventHandler() - // Initialize the SerfQuery object - s.lanSerfQuery, err = NewSerfQuery(config, s.serfLAN) + // Initialize the SerfDiag object + s.lanSerfDiag, err = NewSerfDiag(config, s.serfLAN) if err != nil { s.Shutdown() - return nil, fmt.Errorf("Failed to initialize lanSerfQuery: %v", err) + return nil, fmt.Errorf("Failed to initialize lanSerfDiag: %v", err) } // Initialize the wan Serf @@ -728,6 +729,11 @@ func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) { } func (s *Server) SerfQuery(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { - qr, err := s.lanSerfQuery.Query(name, payload, params) + qr, err := s.lanSerfDiag.Query(name, payload, params) return qr, err } + +func (s *Server) SerfPing(param *SerfPingParam) (*SerfPingResponse, error) { + resp, err := s.lanSerfDiag.Ping(param) + return resp, err +} From 12c3f6b261ef4e2f4433131f98167d7049372c4b Mon Sep 17 00:00:00 2001 From: John Sullivan Date: Wed, 4 Nov 2015 08:45:06 -0500 Subject: [PATCH 11/13] Incorporate feedback from Ryan Breen. --- command/agent/agent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index bfd6c995b913..e28ceca2b04d 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1496,5 +1496,5 @@ func (a *Agent) SerfPing(name string) (*serfPingResponse, error) { Success: cresp.Success, RTT: cresp.RTT, } - return &resp, err + return &resp, nil } From 468de5b3b1f6ef9b8879afd88f98a51c61800c1f Mon Sep 17 00:00:00 2001 From: John Sullivan Date: Wed, 4 Nov 2015 20:08:28 -0500 Subject: [PATCH 12/13] Handle error indicating no ping response, use Serf member list and not 'memberlist' member list for node info. --- consul/serfdiag.go | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/consul/serfdiag.go b/consul/serfdiag.go index 836490a2c9ef..2df84bfb1e95 100644 --- a/consul/serfdiag.go +++ b/consul/serfdiag.go @@ -2,6 +2,7 @@ package consul import ( "fmt" + "github.com/hashicorp/memberlist" "github.com/hashicorp/serf/serf" "log" "os" @@ -91,10 +92,31 @@ type SerfPingResponse struct { func (c *SerfDiag) Ping(params *SerfPingParam) (*SerfPingResponse, error) { c.logger.Printf("[DEBUG] client: Requesting serf ping send: %s", params.Name) - success, rtt, err := c.serf.Memberlist().Ping(params.Name) - resp := SerfPingResponse{ - Success: success, - RTT: rtt, + var node *serf.Member + node = nil + for _, m := range c.serf.Members() { + if m.Name == params.Name { + node = &m + break + } + } + if node == nil { + return nil, fmt.Errorf("Member %s not found in data center.", params.Name) + } + if rtt, err := c.serf.Memberlist().Ping(node.Name, node.Addr, node.Port); err == nil { + return &SerfPingResponse{ + Success: true, + RTT: *rtt, + }, err + } else { + switch err.(type) { + case memberlist.NoPingResponseError: + return &SerfPingResponse{ + Success: false, + RTT: *rtt, + }, nil + default: + return nil, err + } } - return &resp, err } From 479bf0092731d21a15eaf00016d0cb2d7841e51b Mon Sep 17 00:00:00 2001 From: John Sullivan Date: Thu, 5 Nov 2015 14:59:46 -0500 Subject: [PATCH 13/13] Add documentation for 'ping' command. --- .../source/docs/commands/index.html.markdown | 1 + .../source/docs/commands/ping.html.markdown | 89 +++++++++++++++++++ website/source/layouts/docs.erb | 4 + 3 files changed, 94 insertions(+) create mode 100644 website/source/docs/commands/ping.html.markdown diff --git a/website/source/docs/commands/index.html.markdown b/website/source/docs/commands/index.html.markdown index 1f414d42adbc..7ae20c026ca6 100644 --- a/website/source/docs/commands/index.html.markdown +++ b/website/source/docs/commands/index.html.markdown @@ -38,6 +38,7 @@ Available commands are: lock Execute a command holding a lock members Lists the members of a Consul cluster monitor Stream logs from a Consul agent + ping Issue Serf 'ping' messages to specified node reachability Test network reachability reload Triggers the agent to reload configuration files rtt Estimates network round trip time between nodes diff --git a/website/source/docs/commands/ping.html.markdown b/website/source/docs/commands/ping.html.markdown new file mode 100644 index 000000000000..a95bf0db48e2 --- /dev/null +++ b/website/source/docs/commands/ping.html.markdown @@ -0,0 +1,89 @@ +--- +layout: "docs" +page_title: "Commands: Ping" +sidebar_current: "docs-commands-ping" +description: |- + The `ping` command issues Serf 'ping' messages to specified node and displays response statistics. +--- + +# Consul Ping + +Command: `consul ping` + +The `ping` command issues Serf 'ping' messages to the specified node. +If a count is specified, the test will issue the specified number of +packets to the node; if no count is specified, the test will continue +indefinitely until interrupted by CTRL-C. Once the test is finished +(or interrupted), it displays statistics describing the number of +packets sent, the number of responses received, the percentage of +successful 'pings', and the average latency or round trip time of all +successful responses. + +This can be used to troubleshoot configurations or network issues, since +nodes that are having issues responding to UDP traffic will either +intermittently respond or not respond at all. + +In general, the following troubleshooting tips are recommended: + +* Ensure that the bind addr:port is accessible by all other nodes +* If an advertise address is set, ensure it routes to the bind address +* Check that no nodes are behind a NAT +* If nodes are behind firewalls or iptables, check that Serf traffic is permitted (UDP and TCP) +* Verify networking equipment is functional + +## Usage + +Usage: `consul ping [options]` + +The following command-line options are available for this command. +Every option is optional: + +* `-rpc-addr` - Address to the RPC server of the agent you want to contact + to send this command. If this isn't specified, the command will contact + "127.0.0.1:8400" which is the default RPC address of a Consul + agent. This option can also be controlled using the `CONSUL_RPC_ADDR` + environment variable. + +* `-node` - Name of the node to ping + +* `-count` - Number of pings to issue to target node. If not + specified, continue issuing pings until interrupted by CTRL-C. + +## Output + +If the node exists in the data center, the specified number of ping +messages will be sent and statistics displayed upon completion: + +``` +$ ./consul ping -node test-node-1 -count 5 +Starting serf ping... +Count 0: Node test-node-1 responded in 785.244µs +Count 1: Node test-node-1 responded in 871.454µs +Count 2: Node test-node-1 responded in 714.955µs +Count 3: Node test-node-1 responded in 6.008223ms +Count 4: Node test-node-1 responded in 884.21µs +Statistics: total 5, success 100%, avg latency: 1.852817ms +``` + +If the node does not respong to all packets, the output indicates the +timeout value of ping messages for which no response was received: + +``` +$ ./consul ping -node test-node-5 -count 5 +Starting serf ping... +Count 0: Node test-node-5 responded in 858.761µs +Count 1: Node test-node-5 responded in 862.887µs +Count 2: Node test-node-5 responded in 857.277µs +Count 3: Node test-node-5 failed to respond in 500ms +Count 4: Node test-node-5 responded in 953.343µs +Statistics: total 5, success 80%, avg latency: 883.067µs +``` + +If the node is not a member of the data center, an error will be +displayed: + +``` +$ ./consul ping -node nodeDoesNotExist +Starting serf ping... +Error sending serf ping: Member nodeDoesNotExist not found in data center. +``` diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index 02a4dbdd8489..933776636023 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -122,6 +122,10 @@ info + > + ping + + > reachability