Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1456,3 +1456,45 @@ func (a *Agent) DisableNodeMaintenance() {
a.RemoveCheck(nodeMaintCheckID, true)
a.logger.Printf("[INFO] agent: Node left maintenance mode")
}

// SerfQuery sends a Query on Serf, see Serf.Query.
func (a *Agent) SerfQuery(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) {
a.logger.Printf("[DEBUG] agent: Requesting serf query send: %s. Payload: %#v",
name, string(payload))
var resp *serf.QueryResponse
var err error
if a.server != nil {
resp, err = a.server.SerfQuery(name, payload, params)
} else {
resp, err = a.client.SerfQuery(name, payload, params)
}
if err != nil {
a.logger.Printf("[WARN] agent: failed to start user serf query: %v", err)
}
return resp, err
}

// SerfPing sends a Memberlist Ping via Serf.
func (a *Agent) SerfPing(name string) (*serfPingResponse, error) {
a.logger.Printf("[DEBUG] agent: Requesting serf ping send: %s", name)
var err error
var cresp *consul.SerfPingResponse
params := consul.SerfPingParam{
Name: name,
}
if a.server != nil {
cresp, err = a.server.SerfPing(&params)
} else {
cresp, err = a.client.SerfPing(&params)
}
if err != nil {
a.logger.Printf("[WARN] agent: failed to start user serf ping: %v", err)
return nil, err
}

resp := serfPingResponse{
Success: cresp.Success,
RTT: cresp.RTT,
}
return &resp, nil
}
91 changes: 91 additions & 0 deletions command/agent/ipc_serf_query_response_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package agent

import (
"github.com/hashicorp/serf/serf"
"log"
"time"
)

// serfQueryResponseStream is used to stream the serf query results back to a client
type serfQueryResponseStream struct {
client streamClient
logger *log.Logger
seq uint64
}

func newSerfQueryResponseStream(client streamClient, seq uint64, logger *log.Logger) *serfQueryResponseStream {
qs := &serfQueryResponseStream{
client: client,
logger: logger,
seq: seq,
}
return qs
}

// Stream is a long running routine used to stream the results of a serf query back to a client
func (qs *serfQueryResponseStream) Stream(resp *serf.QueryResponse) {
// Setup a timer for the serf query ending
remaining := resp.Deadline().Sub(time.Now())
done := time.After(remaining)

ackCh := resp.AckCh()
respCh := resp.ResponseCh()
for {
select {
case a := <-ackCh:
if err := qs.sendAck(a); err != nil {
qs.logger.Printf("[ERR] agent.ipc: Failed to stream serf query ack to %v: %v", qs.client, err)
return
}
case r := <-respCh:
if err := qs.sendResponse(r.From, r.Payload); err != nil {
qs.logger.Printf("[ERR] agent.ipc: Failed to stream serf query response to %v: %v", qs.client, err)
return
}
case <-done:
if err := qs.sendDone(); err != nil {
qs.logger.Printf("[ERR] agent.ipc: Failed to stream serf query end to %v: %v", qs.client, err)
}
return
}
}
}

// sendAck is used to send a single ack
func (qs *serfQueryResponseStream) sendAck(from string) error {
header := responseHeader{
Seq: qs.seq,
Error: "",
}
rec := serfQueryRecord{
Type: serfQueryRecordAck,
From: from,
}
return qs.client.Send(&header, &rec)
}

// sendResponse is used to send a single response
func (qs *serfQueryResponseStream) sendResponse(from string, payload []byte) error {
header := responseHeader{
Seq: qs.seq,
Error: "",
}
rec := serfQueryRecord{
Type: serfQueryRecordResponse,
From: from,
Payload: payload,
}
return qs.client.Send(&header, &rec)
}

// sendDone is used to signal the end
func (qs *serfQueryResponseStream) sendDone() error {
header := responseHeader{
Seq: qs.seq,
Error: "",
}
rec := serfQueryRecord{
Type: serfQueryRecordDone,
}
return qs.client.Send(&header, &rec)
}
67 changes: 67 additions & 0 deletions command/agent/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"os"
"strings"
"sync"
"time"

"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-msgpack/codec"
Expand Down Expand Up @@ -57,6 +58,8 @@ const (
useKeyCommand = "use-key"
removeKeyCommand = "remove-key"
listKeysCommand = "list-keys"
serfQueryCommand = "serf-query"
serfPingCommand = "serf-ping"
)

const (
Expand Down Expand Up @@ -188,6 +191,11 @@ type memberEventRecord struct {
Members []Member
}

type serfPingResponse struct {
Success bool
RTT time.Duration
}

type AgentRPC struct {
sync.Mutex
agent *Agent
Expand Down Expand Up @@ -410,6 +418,12 @@ func (i *AgentRPC) handleRequest(client *rpcClient, reqHeader *requestHeader) er
case installKeyCommand, useKeyCommand, removeKeyCommand, listKeysCommand:
return i.handleKeyring(client, seq, command, token)

case serfQueryCommand:
return i.handleSerfQuery(client, seq)

case serfPingCommand:
return i.handleSerfPing(client, seq)

default:
respHeader := responseHeader{Seq: seq, Error: unsupportedCommand}
client.Send(&respHeader, nil)
Expand Down Expand Up @@ -578,6 +592,59 @@ func (i *AgentRPC) handleStop(client *rpcClient, seq uint64) error {
return client.Send(&resp, nil)
}

func (i *AgentRPC) handleSerfQuery(client *rpcClient, seq uint64) error {
var req serfQueryRequest
if err := client.dec.Decode(&req); err != nil {
return fmt.Errorf("decode failed: %v", err)
}

// Setup the serf query
params := serf.QueryParam{
FilterNodes: req.FilterNodes,
FilterTags: req.FilterTags,
RequestAck: req.RequestAck,
Timeout: req.Timeout,
}

// Start the serf query
serfQueryResp, err := i.agent.SerfQuery(req.Name, req.Payload, &params)

// Stream the serf query responses
if err == nil {
qs := newSerfQueryResponseStream(client, seq, i.logger)
defer func() {
go qs.Stream(serfQueryResp)
}()
}

// Respond
resp := responseHeader{
Seq: seq,
Error: errToString(err),
}

return client.Send(&resp, nil)
}

func (i *AgentRPC) handleSerfPing(client *rpcClient, seq uint64) error {
var req serfPingRequest
if err := client.dec.Decode(&req); err != nil {
return fmt.Errorf("decode failed: %v", err)
}

i.logger.Printf("[DEBUG] agent.rpc: handling serf ping request")

// Start the serf query
resp, err := i.agent.SerfPing(req.Name)

header := responseHeader{
Seq: seq,
Error: errToString(err),
}

return client.Send(&header, &resp)
}

func (i *AgentRPC) handleLeave(client *rpcClient, seq uint64) error {
i.logger.Printf("[INFO] agent.rpc: Graceful leave triggered")

Expand Down
Loading