diff --git a/command/agent/agent.go b/command/agent/agent.go index 14db1ea3de70..e28ceca2b04d 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1456,3 +1456,45 @@ func (a *Agent) DisableNodeMaintenance() { a.RemoveCheck(nodeMaintCheckID, true) a.logger.Printf("[INFO] agent: Node left maintenance mode") } + +// SerfQuery sends a Query on Serf, see Serf.Query. +func (a *Agent) SerfQuery(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { + a.logger.Printf("[DEBUG] agent: Requesting serf query send: %s. Payload: %#v", + name, string(payload)) + var resp *serf.QueryResponse + var err error + if a.server != nil { + resp, err = a.server.SerfQuery(name, payload, params) + } else { + resp, err = a.client.SerfQuery(name, payload, params) + } + if err != nil { + a.logger.Printf("[WARN] agent: failed to start user serf query: %v", err) + } + return resp, err +} + +// SerfPing sends a Memberlist Ping via Serf. +func (a *Agent) SerfPing(name string) (*serfPingResponse, error) { + a.logger.Printf("[DEBUG] agent: Requesting serf ping send: %s", name) + var err error + var cresp *consul.SerfPingResponse + params := consul.SerfPingParam{ + Name: name, + } + if a.server != nil { + cresp, err = a.server.SerfPing(¶ms) + } else { + cresp, err = a.client.SerfPing(¶ms) + } + if err != nil { + a.logger.Printf("[WARN] agent: failed to start user serf ping: %v", err) + return nil, err + } + + resp := serfPingResponse{ + Success: cresp.Success, + RTT: cresp.RTT, + } + return &resp, nil +} diff --git a/command/agent/ipc_serf_query_response_stream.go b/command/agent/ipc_serf_query_response_stream.go new file mode 100644 index 000000000000..b2b86602df58 --- /dev/null +++ b/command/agent/ipc_serf_query_response_stream.go @@ -0,0 +1,91 @@ +package agent + +import ( + "github.com/hashicorp/serf/serf" + "log" + "time" +) + +// serfQueryResponseStream is used to stream the serf query results back to a client +type serfQueryResponseStream struct { + client streamClient + logger *log.Logger + seq uint64 +} + +func newSerfQueryResponseStream(client streamClient, seq uint64, logger *log.Logger) *serfQueryResponseStream { + qs := &serfQueryResponseStream{ + client: client, + logger: logger, + seq: seq, + } + return qs +} + +// Stream is a long running routine used to stream the results of a serf query back to a client +func (qs *serfQueryResponseStream) Stream(resp *serf.QueryResponse) { + // Setup a timer for the serf query ending + remaining := resp.Deadline().Sub(time.Now()) + done := time.After(remaining) + + ackCh := resp.AckCh() + respCh := resp.ResponseCh() + for { + select { + case a := <-ackCh: + if err := qs.sendAck(a); err != nil { + qs.logger.Printf("[ERR] agent.ipc: Failed to stream serf query ack to %v: %v", qs.client, err) + return + } + case r := <-respCh: + if err := qs.sendResponse(r.From, r.Payload); err != nil { + qs.logger.Printf("[ERR] agent.ipc: Failed to stream serf query response to %v: %v", qs.client, err) + return + } + case <-done: + if err := qs.sendDone(); err != nil { + qs.logger.Printf("[ERR] agent.ipc: Failed to stream serf query end to %v: %v", qs.client, err) + } + return + } + } +} + +// sendAck is used to send a single ack +func (qs *serfQueryResponseStream) sendAck(from string) error { + header := responseHeader{ + Seq: qs.seq, + Error: "", + } + rec := serfQueryRecord{ + Type: serfQueryRecordAck, + From: from, + } + return qs.client.Send(&header, &rec) +} + +// sendResponse is used to send a single response +func (qs *serfQueryResponseStream) sendResponse(from string, payload []byte) error { + header := responseHeader{ + Seq: qs.seq, + Error: "", + } + rec := serfQueryRecord{ + Type: serfQueryRecordResponse, + From: from, + Payload: payload, + } + return qs.client.Send(&header, &rec) +} + +// sendDone is used to signal the end +func (qs *serfQueryResponseStream) sendDone() error { + header := responseHeader{ + Seq: qs.seq, + Error: "", + } + rec := serfQueryRecord{ + Type: serfQueryRecordDone, + } + return qs.client.Send(&header, &rec) +} diff --git a/command/agent/rpc.go b/command/agent/rpc.go index dd2d376c60e9..60b5219cb33d 100644 --- a/command/agent/rpc.go +++ b/command/agent/rpc.go @@ -30,6 +30,7 @@ import ( "os" "strings" "sync" + "time" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/go-msgpack/codec" @@ -57,6 +58,8 @@ const ( useKeyCommand = "use-key" removeKeyCommand = "remove-key" listKeysCommand = "list-keys" + serfQueryCommand = "serf-query" + serfPingCommand = "serf-ping" ) const ( @@ -188,6 +191,11 @@ type memberEventRecord struct { Members []Member } +type serfPingResponse struct { + Success bool + RTT time.Duration +} + type AgentRPC struct { sync.Mutex agent *Agent @@ -410,6 +418,12 @@ func (i *AgentRPC) handleRequest(client *rpcClient, reqHeader *requestHeader) er case installKeyCommand, useKeyCommand, removeKeyCommand, listKeysCommand: return i.handleKeyring(client, seq, command, token) + case serfQueryCommand: + return i.handleSerfQuery(client, seq) + + case serfPingCommand: + return i.handleSerfPing(client, seq) + default: respHeader := responseHeader{Seq: seq, Error: unsupportedCommand} client.Send(&respHeader, nil) @@ -578,6 +592,59 @@ func (i *AgentRPC) handleStop(client *rpcClient, seq uint64) error { return client.Send(&resp, nil) } +func (i *AgentRPC) handleSerfQuery(client *rpcClient, seq uint64) error { + var req serfQueryRequest + if err := client.dec.Decode(&req); err != nil { + return fmt.Errorf("decode failed: %v", err) + } + + // Setup the serf query + params := serf.QueryParam{ + FilterNodes: req.FilterNodes, + FilterTags: req.FilterTags, + RequestAck: req.RequestAck, + Timeout: req.Timeout, + } + + // Start the serf query + serfQueryResp, err := i.agent.SerfQuery(req.Name, req.Payload, ¶ms) + + // Stream the serf query responses + if err == nil { + qs := newSerfQueryResponseStream(client, seq, i.logger) + defer func() { + go qs.Stream(serfQueryResp) + }() + } + + // Respond + resp := responseHeader{ + Seq: seq, + Error: errToString(err), + } + + return client.Send(&resp, nil) +} + +func (i *AgentRPC) handleSerfPing(client *rpcClient, seq uint64) error { + var req serfPingRequest + if err := client.dec.Decode(&req); err != nil { + return fmt.Errorf("decode failed: %v", err) + } + + i.logger.Printf("[DEBUG] agent.rpc: handling serf ping request") + + // Start the serf query + resp, err := i.agent.SerfPing(req.Name) + + header := responseHeader{ + Seq: seq, + Error: errToString(err), + } + + return client.Send(&header, &resp) +} + func (i *AgentRPC) handleLeave(client *rpcClient, seq uint64) error { i.logger.Printf("[INFO] agent.rpc: Graceful leave triggered") diff --git a/command/agent/rpc_client.go b/command/agent/rpc_client.go index 3ce90b1634c3..f430978d6c11 100644 --- a/command/agent/rpc_client.go +++ b/command/agent/rpc_client.go @@ -3,6 +3,7 @@ package agent import ( "bufio" "fmt" + "github.com/hashicorp/consul/consul" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/logutils" "log" @@ -11,6 +12,7 @@ import ( "strings" "sync" "sync/atomic" + "time" ) var ( @@ -345,6 +347,163 @@ func (c *RPCClient) Monitor(level logutils.LogLevel, ch chan<- string) (StreamHa } } +type serfQueryHandler struct { + client *RPCClient + closed bool + init bool + initCh chan<- error + ackCh chan<- string + respCh chan<- consul.NodeResponse + seq uint64 +} + +func (qh *serfQueryHandler) Handle(resp *responseHeader) { + // Initialize on the first response + if !qh.init { + qh.init = true + qh.initCh <- strToError(resp.Error) + return + } + + // Decode the query response + var rec serfQueryRecord + if err := qh.client.dec.Decode(&rec); err != nil { + log.Printf("[ERR] Failed to decode serf query response: %v", err) + qh.client.deregisterHandler(qh.seq) + return + } + + switch rec.Type { + case serfQueryRecordAck: + select { + case qh.ackCh <- rec.From: + default: + log.Printf("[ERR] Dropping serf query ack, channel full") + } + + case serfQueryRecordResponse: + select { + case qh.respCh <- consul.NodeResponse{rec.From, rec.Payload}: + default: + log.Printf("[ERR] Dropping serf query response, channel full") + } + + case serfQueryRecordDone: + // No further records coming + qh.client.deregisterHandler(qh.seq) + + default: + log.Printf("[ERR] Unrecognized serf query record type: %s", rec.Type) + } +} + +func (qh *serfQueryHandler) Cleanup() { + if !qh.closed { + if !qh.init { + qh.init = true + qh.initCh <- fmt.Errorf("Stream closed") + } + if qh.ackCh != nil { + close(qh.ackCh) + } + if qh.respCh != nil { + close(qh.respCh) + } + qh.closed = true + } +} + +const ( + serfQueryRecordAck = "ack" + serfQueryRecordResponse = "response" + serfQueryRecordDone = "done" +) + +type serfQueryRecord struct { + Type string + From string + Payload []byte +} + +type serfQueryRequest struct { + FilterNodes []string + FilterTags map[string]string + RequestAck bool + Timeout time.Duration + Name string + Payload []byte +} + +// SerfQuery initiates a new query message using the given parameters, +// and streams acks and responses over the given channels. The +// channels will not block on sends and should be buffered. At the end +// of the query, the channels will be closed. +func (c *RPCClient) SerfQuery(params *consul.SerfQueryParam) error { + // Setup the request + seq := c.getSeq() + header := requestHeader{ + Command: serfQueryCommand, + Seq: seq, + } + req := serfQueryRequest{ + FilterNodes: params.FilterNodes, + FilterTags: params.FilterTags, + RequestAck: params.RequestAck, + Timeout: params.Timeout, + Name: params.Name, + Payload: params.Payload, + } + + // Create a query handler + initCh := make(chan error, 1) + handler := &serfQueryHandler{ + client: c, + initCh: initCh, + ackCh: params.AckCh, + respCh: params.RespCh, + seq: seq, + } + c.handleSeq(seq, handler) + + // Send the request + if err := c.send(&header, &req); err != nil { + c.deregisterHandler(seq) + return err + } + + // Wait for a response + select { + case err := <-initCh: + return err + case <-c.shutdownCh: + c.deregisterHandler(seq) + return clientClosed + } +} + +type serfPingRequest struct { + Name string +} + +// SerfPing initiates a new ping message using the given parameters +func (c *RPCClient) SerfPing(params *consul.SerfPingParam) (*serfPingResponse, error) { + // Setup the request + seq := c.getSeq() + header := requestHeader{ + Command: serfPingCommand, + Seq: seq, + } + req := serfPingRequest{ + Name: params.Name, + } + + var resp serfPingResponse + // Send the request + err := c.genericRPC(&header, &req, &resp) + + return &resp, err +} + // Stop is used to unsubscribe from logs or event streams func (c *RPCClient) Stop(handle StreamHandle) error { // Deregister locally first to stop delivery diff --git a/command/ping.go b/command/ping.go new file mode 100644 index 000000000000..b3a8813a7288 --- /dev/null +++ b/command/ping.go @@ -0,0 +1,108 @@ +package command + +import ( + "flag" + "fmt" + "github.com/hashicorp/consul/consul" + "github.com/mitchellh/cli" + "os" + "os/signal" + "strings" + "time" +) + +// PingCommand is a Command implementation that is used to initiate +// a series of Serf 'ping' messages to the specified client node +type PingCommand struct { + ShutdownCh <-chan struct{} + Ui cli.Ui +} + +func (c *PingCommand) Help() string { + helpText := ` +Usage: consul reachability [options] + + Issues a series of Serf 'ping' messages to the specified node + +Options: + + -rpc-addr=127.0.0.1:8400 RPC address of the Consul agent. + -count Number of pings to send (0 is infinite) + -node Name of node to ping +` + return strings.TrimSpace(helpText) +} + +func (c *PingCommand) Run(args []string) int { + var count int + var nodename string + exit := 0 + cmdFlags := flag.NewFlagSet("ping", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } + cmdFlags.IntVar(&count, "count", 0, "count") + cmdFlags.StringVar(&nodename, "node", "*", "name of node to ping") + rpcAddr := RPCAddrFlag(cmdFlags) + if err := cmdFlags.Parse(args); err != nil { + return 1 + } + + cl, err := RPCClient(*rpcAddr) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) + return 1 + } + defer cl.Close() + + success := 0 + latency := 0 * time.Millisecond + var sent int + c.Ui.Output("Starting serf ping...") + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt) + go func() { + <-sigChan + cleanup(sent, success, latency, c) + os.Exit(exit) + }() + for sent = 0; (count == 0) || (sent < count); sent++ { + if sent > 0 { + time.Sleep(time.Second) + } + // Start the query + params := consul.SerfPingParam{ + Name: nodename, + } + if resp, err := cl.SerfPing(¶ms); err != nil { + c.Ui.Error(fmt.Sprintf("Error sending serf ping: %s", err)) + exit = 1 + break + } else if resp.Success { + c.Ui.Output(fmt.Sprintf("Count %d: Node %s responded in %s", sent, nodename, resp.RTT)) + latency = latency + resp.RTT + success++ + } else { + c.Ui.Output(fmt.Sprintf("Count %d: Node %s failed to respond in %s", sent, nodename, resp.RTT)) + } + } + + if exit == 0 { + cleanup(sent, success, latency, c) + } + + return exit +} + +func cleanup(sent int, success int, latency time.Duration, c *PingCommand) { + var avgLatency string + if success > 0 { + avgLatency = fmt.Sprintf("%s", latency/time.Duration(success)) + } else { + avgLatency = "N/A" + } + c.Ui.Output(fmt.Sprintf("Statistics: total %d, success %d%%, avg latency: %s", + sent, (success * 100 / sent), avgLatency)) +} + +func (c *PingCommand) Synopsis() string { + return "Issue Serf 'ping' messages to specified node" +} diff --git a/command/reachability.go b/command/reachability.go new file mode 100644 index 000000000000..457a1415518f --- /dev/null +++ b/command/reachability.go @@ -0,0 +1,184 @@ +package command + +import ( + "flag" + "fmt" + "github.com/hashicorp/consul/command/agent" + "github.com/hashicorp/consul/consul" + "github.com/hashicorp/serf/serf" + "github.com/mitchellh/cli" + "strings" + "time" +) + +const ( + nonLiveNodeAcks = `This could mean Serf is detecting false-failures due to a misconfiguration or network issue.` + liveNodeMissingAcks = `This could mean Serf gossip packets are being lost due to a misconfiguration or network issue.` + duplicateResponses = `Duplicate responses means there is a misconfiguration. Verify that node names are unique.` + troubleshooting = ` +Troubleshooting tips: +* Ensure that the bind addr:port is accessible by all other nodes +* If an advertise address is set, ensure it routes to the bind address +* Check that no nodes are behind a NAT +* If nodes are behind firewalls or iptables, check that Serf traffic is permitted (UDP and TCP) +* Verify networking equipment is functional` +) + +// ReachabilityCommand is a Command implementation that is used to trigger +// a new reachability test +type ReachabilityCommand struct { + ShutdownCh <-chan struct{} + Ui cli.Ui +} + +func (c *ReachabilityCommand) Help() string { + helpText := ` +Usage: consul reachability [options] + + Tests the network reachability of this node + +Options: + + -rpc-addr=127.0.0.1:8400 RPC address of the Consul agent. + -verbose Verbose mode +` + return strings.TrimSpace(helpText) +} + +func (c *ReachabilityCommand) Run(args []string) int { + var verbose bool + cmdFlags := flag.NewFlagSet("reachability", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } + cmdFlags.BoolVar(&verbose, "verbose", false, "verbose mode") + rpcAddr := RPCAddrFlag(cmdFlags) + if err := cmdFlags.Parse(args); err != nil { + return 1 + } + + cl, err := RPCClient(*rpcAddr) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) + return 1 + } + defer cl.Close() + + ackCh := make(chan string, 128) + + // Get the list of LAN members + var members []agent.Member + members, err = cl.LANMembers() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error getting members: %s", err)) + return 1 + } + + // Get only the live members + liveMembers := make(map[string]struct{}) + for _, m := range members { + if m.Status == "alive" { + liveMembers[m.Name] = struct{}{} + } + } + c.Ui.Output(fmt.Sprintf("Total members: %d, live members: %d", len(members), len(liveMembers))) + + // Start the query + params := consul.SerfQueryParam{ + RequestAck: true, + Name: serf.InternalQueryPrefix + "ping", + AckCh: ackCh, + } + if err := cl.SerfQuery(¶ms); err != nil { + c.Ui.Error(fmt.Sprintf("Error sending serf query: %s", err)) + return 1 + } + c.Ui.Output("Starting reachability test...") + start := time.Now() + last := time.Now() + + // Track responses and acknowledgements + exit := 0 + dups := false + numAcks := 0 + acksFrom := make(map[string]struct{}, len(members)) + +OUTER: + for { + select { + case a := <-ackCh: + if a == "" { + break OUTER + } + if verbose { + c.Ui.Output(fmt.Sprintf("\tAck from '%s'", a)) + } + numAcks++ + if _, ok := acksFrom[a]; ok { + dups = true + c.Ui.Output(fmt.Sprintf("Duplicate response from '%v'", a)) + } + acksFrom[a] = struct{}{} + last = time.Now() + + case <-c.ShutdownCh: + c.Ui.Error("Test interrupted") + return 1 + } + } + + if verbose { + total := float64(time.Now().Sub(start)) / float64(time.Second) + timeToLast := float64(last.Sub(start)) / float64(time.Second) + c.Ui.Output(fmt.Sprintf("Query time: %0.2f sec, time to last response: %0.2f sec", total, timeToLast)) + } + + // Print troubleshooting info for duplicate responses + if dups { + c.Ui.Output(duplicateResponses) + exit = 1 + } + + // Ensure all live members responded. + liveNotResponding := make(map[string]bool) + for m := range liveMembers { + if _, ok := acksFrom[m]; !ok { + c.Ui.Output(fmt.Sprintf("Missing ack from: %s", m)) + liveNotResponding[m] = true + } + } + + // Ensure that no responses came from non-live nodes. + nonliveResponding := make(map[string]bool) + for m := range acksFrom { + if _, ok := liveMembers[m]; !ok { + c.Ui.Output(fmt.Sprintf("Received ack from non-live node: %s", m)) + nonliveResponding[m] = true + } + } + + if (len(liveNotResponding) == 0) && (len(nonliveResponding) == 0) { + c.Ui.Output("Successfully contacted all live nodes") + + } else { + if len(nonliveResponding) != 0 { + c.Ui.Output("Acks from non-live nodes:") + for m := range nonliveResponding { + c.Ui.Output(fmt.Sprintf("\t%s", m)) + } + c.Ui.Output(nonLiveNodeAcks) + } + if len(liveNotResponding) != 0 { + c.Ui.Output("Missing acks from:") + for m := range liveNotResponding { + c.Ui.Output(fmt.Sprintf("\t%s", m)) + } + c.Ui.Output(liveNodeMissingAcks) + } + c.Ui.Output(troubleshooting) + exit = 1 + } + return exit +} + +func (c *ReachabilityCommand) Synopsis() string { + return "Test network reachability" +} diff --git a/command/reachability_test.go b/command/reachability_test.go new file mode 100644 index 000000000000..77b3c055d030 --- /dev/null +++ b/command/reachability_test.go @@ -0,0 +1,32 @@ +package command + +import ( + "github.com/mitchellh/cli" + "strings" + "testing" +) + +func TestReachabilityCommand_implements(t *testing.T) { + var _ cli.Command = &ReachabilityCommand{} +} + +func TestReachabilityCommand_Run(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + + ui := new(cli.MockUi) + c := &ReachabilityCommand{Ui: ui} + args := []string{"-rpc-addr=" + a1.addr, "-verbose"} + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + + if !strings.Contains(ui.OutputWriter.String(), a1.config.NodeName) { + t.Fatalf("bad: %#v", ui.OutputWriter.String()) + } + if !strings.Contains(ui.OutputWriter.String(), "Successfully") { + t.Fatalf("bad: %#v", ui.OutputWriter.String()) + } +} diff --git a/commands.go b/commands.go index 57c640ab5a7f..9961448e47bc 100644 --- a/commands.go +++ b/commands.go @@ -108,6 +108,18 @@ func init() { }, nil }, + "ping": func() (cli.Command, error) { + return &command.PingCommand{ + Ui: ui, + }, nil + }, + + "reachability": func() (cli.Command, error) { + return &command.ReachabilityCommand{ + Ui: ui, + }, nil + }, + "reload": func() (cli.Command, error) { return &command.ReloadCommand{ Ui: ui, diff --git a/consul/client.go b/consul/client.go index b3fdb080e880..c6c3d3150c5d 100644 --- a/consul/client.go +++ b/consul/client.go @@ -63,6 +63,9 @@ type Client struct { // which contains all the DC nodes serf *serf.Serf + // serfDiag is used to perform Serf-based diagnostic tests + serfDiag *SerfDiag + shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex @@ -119,6 +122,14 @@ func NewClient(config *Config) (*Client, error) { c.Shutdown() return nil, fmt.Errorf("Failed to start lan serf: %v", err) } + + // Initialize the SerfDiag object + c.serfDiag, err = NewSerfDiag(config, c.serf) + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("Failed to initialize serfDiag: %v", err) + } + return c, nil } @@ -384,3 +395,13 @@ func (c *Client) Stats() map[string]map[string]string { func (c *Client) GetCoordinate() (*coordinate.Coordinate, error) { return c.serf.GetCoordinate() } + +func (c *Client) SerfQuery(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { + qr, err := c.serfDiag.Query(name, payload, params) + return qr, err +} + +func (c *Client) SerfPing(param *SerfPingParam) (*SerfPingResponse, error) { + resp, err := c.serfDiag.Ping(param) + return resp, err +} diff --git a/consul/serfdiag.go b/consul/serfdiag.go new file mode 100644 index 000000000000..2df84bfb1e95 --- /dev/null +++ b/consul/serfdiag.go @@ -0,0 +1,122 @@ +package consul + +import ( + "fmt" + "github.com/hashicorp/memberlist" + "github.com/hashicorp/serf/serf" + "log" + "os" + "strings" + "time" +) + +// NodeResponse is used to return the response of a serf query +type NodeResponse struct { + From string + Payload []byte +} + +// SerfQueryParam is provided to customize various serf query +// settings. +type SerfQueryParam struct { + FilterNodes []string // A list of node names to restrict query to + FilterTags map[string]string // A map of tag name to regex to filter on + RequestAck bool // Should nodes ack the query receipt + Timeout time.Duration // Maximum query duration. Optional, will be set automatically. + Name string // Opaque query name + Payload []byte // Opaque query payload + AckCh chan<- string // Channel to send Ack replies on + RespCh chan<- NodeResponse // Channel to send responses on +} + +// SerfDiag handles Serf-based diagnostics for both the Consul server +// and the Consul client +type SerfDiag struct { + // Logger uses the provided LogOutput + logger *log.Logger + + // serf is the Serf cluster maintained inside the DC + // which contains all the DC nodes + serf *serf.Serf +} + +func NewSerfDiag(config *Config, serf *serf.Serf) (*SerfDiag, error) { + // Check the protocol version + if err := config.CheckVersion(); err != nil { + return nil, err + } + + // Ensure we have a log output + if config.LogOutput == nil { + config.LogOutput = os.Stderr + } + + // Create a logger + logger := log.New(config.LogOutput, "", log.LstdFlags) + + // Create the SerfDiag object. + sd := &SerfDiag{ + logger: logger, + serf: serf, + } + + return sd, nil +} + +func (c *SerfDiag) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { + // Prevent the use of the internal prefix + if strings.HasPrefix(name, serf.InternalQueryPrefix) { + // Allow the special "ping" query + if name != serf.InternalQueryPrefix+"ping" || payload != nil { + return nil, fmt.Errorf("Serf Queries cannot contain the '%s' prefix", serf.InternalQueryPrefix) + } + } + c.logger.Printf("[DEBUG] client: Requesting serf query send: %s. Payload: %#v", + name, string(payload)) + resp, err := c.serf.Query(name, payload, params) + if err != nil { + c.logger.Printf("[WARN] client: failed to start user serf query: %v", err) + } + return resp, err +} + +// SerfPingParam is provided to customize various Serf Ping settings. +type SerfPingParam struct { + Name string // Name of node to ping +} + +type SerfPingResponse struct { + Success bool + RTT time.Duration +} + +func (c *SerfDiag) Ping(params *SerfPingParam) (*SerfPingResponse, error) { + c.logger.Printf("[DEBUG] client: Requesting serf ping send: %s", params.Name) + var node *serf.Member + node = nil + for _, m := range c.serf.Members() { + if m.Name == params.Name { + node = &m + break + } + } + if node == nil { + return nil, fmt.Errorf("Member %s not found in data center.", params.Name) + } + if rtt, err := c.serf.Memberlist().Ping(node.Name, node.Addr, node.Port); err == nil { + return &SerfPingResponse{ + Success: true, + RTT: *rtt, + }, err + } else { + switch err.(type) { + case memberlist.NoPingResponseError: + return &SerfPingResponse{ + Success: false, + RTT: *rtt, + }, nil + default: + return nil, err + } + } +} diff --git a/consul/server.go b/consul/server.go index d90e163a74ff..49dbd898420f 100644 --- a/consul/server.go +++ b/consul/server.go @@ -136,6 +136,10 @@ type Server struct { // which SHOULD only consist of Consul servers serfWAN *serf.Serf + // lanSerfDiag is used to perform Serf-based diagnostic tests on + // the LAN + lanSerfDiag *SerfDiag + // sessionTimers track the expiration time of each Session that has // a TTL. On expiration, a SessionDestroy event will occur, and // destroy the session via standard session destroy processing @@ -258,6 +262,13 @@ func NewServer(config *Config) (*Server, error) { } go s.lanEventHandler() + // Initialize the SerfDiag object + s.lanSerfDiag, err = NewSerfDiag(config, s.serfLAN) + if err != nil { + s.Shutdown() + return nil, fmt.Errorf("Failed to initialize lanSerfDiag: %v", err) + } + // Initialize the wan Serf s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true) @@ -716,3 +727,13 @@ func (s *Server) GetLANCoordinate() (*coordinate.Coordinate, error) { func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) { return s.serfWAN.GetCoordinate() } + +func (s *Server) SerfQuery(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { + qr, err := s.lanSerfDiag.Query(name, payload, params) + return qr, err +} + +func (s *Server) SerfPing(param *SerfPingParam) (*SerfPingResponse, error) { + resp, err := s.lanSerfDiag.Ping(param) + return resp, err +} diff --git a/scripts/test.sh b/scripts/test.sh index 7a751fe67241..42d7b7546130 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -1,13 +1,15 @@ #!/usr/bin/env bash +export PATH=${PATH}:${GOPATH}/src/github.com/mailgun/godebug + # Create a temp dir and clean it up on exit TEMPDIR=`mktemp -d -t consul-test.XXX` trap "rm -rf $TEMPDIR" EXIT HUP INT QUIT TERM # Build the Consul binary for the API tests echo "--> Building consul" -go build -o $TEMPDIR/consul || exit 1 +godebug build -o $TEMPDIR/consul || exit 1 # Run the tests echo "--> Running tests" -go list ./... | PATH=$TEMPDIR:$PATH xargs -n1 go test +go list ./... | PATH=$TEMPDIR:$PATH xargs -n1 godebug test diff --git a/website/Gemfile b/website/Gemfile index 2b35e281068a..d0e5a286b7bd 100644 --- a/website/Gemfile +++ b/website/Gemfile @@ -1,5 +1,5 @@ source "https://rubygems.org" -ruby "2.2.2" +ruby "2.2.3" gem "middleman-hashicorp", github: "hashicorp/middleman-hashicorp" diff --git a/website/source/docs/commands/index.html.markdown b/website/source/docs/commands/index.html.markdown index e466a94b4460..7ae20c026ca6 100644 --- a/website/source/docs/commands/index.html.markdown +++ b/website/source/docs/commands/index.html.markdown @@ -38,6 +38,8 @@ Available commands are: lock Execute a command holding a lock members Lists the members of a Consul cluster monitor Stream logs from a Consul agent + ping Issue Serf 'ping' messages to specified node + reachability Test network reachability reload Triggers the agent to reload configuration files rtt Estimates network round trip time between nodes version Prints the Consul version diff --git a/website/source/docs/commands/ping.html.markdown b/website/source/docs/commands/ping.html.markdown new file mode 100644 index 000000000000..a95bf0db48e2 --- /dev/null +++ b/website/source/docs/commands/ping.html.markdown @@ -0,0 +1,89 @@ +--- +layout: "docs" +page_title: "Commands: Ping" +sidebar_current: "docs-commands-ping" +description: |- + The `ping` command issues Serf 'ping' messages to specified node and displays response statistics. +--- + +# Consul Ping + +Command: `consul ping` + +The `ping` command issues Serf 'ping' messages to the specified node. +If a count is specified, the test will issue the specified number of +packets to the node; if no count is specified, the test will continue +indefinitely until interrupted by CTRL-C. Once the test is finished +(or interrupted), it displays statistics describing the number of +packets sent, the number of responses received, the percentage of +successful 'pings', and the average latency or round trip time of all +successful responses. + +This can be used to troubleshoot configurations or network issues, since +nodes that are having issues responding to UDP traffic will either +intermittently respond or not respond at all. + +In general, the following troubleshooting tips are recommended: + +* Ensure that the bind addr:port is accessible by all other nodes +* If an advertise address is set, ensure it routes to the bind address +* Check that no nodes are behind a NAT +* If nodes are behind firewalls or iptables, check that Serf traffic is permitted (UDP and TCP) +* Verify networking equipment is functional + +## Usage + +Usage: `consul ping [options]` + +The following command-line options are available for this command. +Every option is optional: + +* `-rpc-addr` - Address to the RPC server of the agent you want to contact + to send this command. If this isn't specified, the command will contact + "127.0.0.1:8400" which is the default RPC address of a Consul + agent. This option can also be controlled using the `CONSUL_RPC_ADDR` + environment variable. + +* `-node` - Name of the node to ping + +* `-count` - Number of pings to issue to target node. If not + specified, continue issuing pings until interrupted by CTRL-C. + +## Output + +If the node exists in the data center, the specified number of ping +messages will be sent and statistics displayed upon completion: + +``` +$ ./consul ping -node test-node-1 -count 5 +Starting serf ping... +Count 0: Node test-node-1 responded in 785.244µs +Count 1: Node test-node-1 responded in 871.454µs +Count 2: Node test-node-1 responded in 714.955µs +Count 3: Node test-node-1 responded in 6.008223ms +Count 4: Node test-node-1 responded in 884.21µs +Statistics: total 5, success 100%, avg latency: 1.852817ms +``` + +If the node does not respong to all packets, the output indicates the +timeout value of ping messages for which no response was received: + +``` +$ ./consul ping -node test-node-5 -count 5 +Starting serf ping... +Count 0: Node test-node-5 responded in 858.761µs +Count 1: Node test-node-5 responded in 862.887µs +Count 2: Node test-node-5 responded in 857.277µs +Count 3: Node test-node-5 failed to respond in 500ms +Count 4: Node test-node-5 responded in 953.343µs +Statistics: total 5, success 80%, avg latency: 883.067µs +``` + +If the node is not a member of the data center, an error will be +displayed: + +``` +$ ./consul ping -node nodeDoesNotExist +Starting serf ping... +Error sending serf ping: Member nodeDoesNotExist not found in data center. +``` diff --git a/website/source/docs/commands/reachability.html.markdown b/website/source/docs/commands/reachability.html.markdown new file mode 100644 index 000000000000..41e6f98fc62b --- /dev/null +++ b/website/source/docs/commands/reachability.html.markdown @@ -0,0 +1,42 @@ +--- +layout: "docs" +page_title: "Commands: Reachability" +sidebar_current: "docs-commands-reachability" +description: |- + The `reachability` command performs a basic network reachability test. The local node will gossip out a Serf ping message and request that all other nodes acknowledge delivery of the message. +--- + +# Consul Reachability + +Command: `consul reachability` + +The `reachability` command performs a basic network reachability test. +The local node will gossip out a Serf "ping" message and request that +all other nodes acknowledge delivery of the message. + +This can be used to troubleshoot configurations or network issues, since +nodes that are detected as having failed may respond, indicating false-failure +detection, or live nodes may fail to respond, indicating networking issues. + +In general, the following troubleshooting tips are recommended: + +* Ensure that the bind addr:port is accessible by all other nodes +* If an advertise address is set, ensure it routes to the bind address +* Check that no nodes are behind a NAT +* If nodes are behind firewalls or iptables, check that Serf traffic is permitted (UDP and TCP) +* Verify networking equipment is functional + +## Usage + +Usage: `consul reachability [options]` + +The following command-line options are available for this command. +Every option is optional: + +* `-rpc-addr` - Address to the RPC server of the agent you want to contact + to send this command. If this isn't specified, the command will contact + "127.0.0.1:8400" which is the default RPC address of a Consul + agent. This option can also be controlled using the `CONSUL_RPC_ADDR` + environment variable. + +* `-verbose` - Enables verbose output diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index 166b1655526b..933776636023 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -122,6 +122,14 @@ info +