Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure Packages and remove State Variables #191

Merged
merged 20 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 101 additions & 57 deletions apm-lambda-extension/extension/apm_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,10 @@ import (
"time"
)

var bufferPool = sync.Pool{New: func() interface{} {
return &bytes.Buffer{}
}}

type ApmServerTransportStatusType string

// Constants for the state of the transport used in
// the backoff implementation.
type ApmServerTransportStatusType string

const (
Failing ApmServerTransportStatusType = "Failing"
Pending ApmServerTransportStatusType = "Pending"
Expand All @@ -48,30 +44,86 @@ const (

// A struct to track the state and status of sending
// to the APM server. Used in the backoff implementation.
type ApmServerTransportStateType struct {
type ApmServerTransport struct {
sync.Mutex
Status ApmServerTransportStatusType
ReconnectionCount int
GracePeriodTimer *time.Timer
bufferPool sync.Pool
jlvoiseux marked this conversation as resolved.
Show resolved Hide resolved
config *extensionConfig
AgentDoneSignal chan struct{}
dataChannel chan AgentData
client *http.Client
status ApmServerTransportStatusType
reconnectionCount int
gracePeriodTimer *time.Timer
}

// The status of transport to the APM server.
//
// This instance of the ApmServerTransportStateType is public for use in tests.
var ApmServerTransportState = ApmServerTransportStateType{
Status: Healthy,
ReconnectionCount: -1,
func InitApmServerTransport(config *extensionConfig) *ApmServerTransport {
var transport ApmServerTransport
transport.bufferPool = sync.Pool{New: func() interface{} {
return &bytes.Buffer{}
}}
transport.AgentDoneSignal = make(chan struct{}, 1)
axw marked this conversation as resolved.
Show resolved Hide resolved
transport.dataChannel = make(chan AgentData, 100)
transport.client = &http.Client{
Timeout: time.Duration(config.DataForwarderTimeoutSeconds) * time.Second,
Transport: http.DefaultTransport.(*http.Transport).Clone(),
}
transport.config = config
transport.status = Healthy
transport.reconnectionCount = -1
return &transport
}

// StartBackgroundApmDataForwarding Receive agent data as it comes in and post it to the APM server.
// Stop checking for, and sending agent data when the function invocation
// has completed, signaled via a channel.
func (transport *ApmServerTransport) ForwardApmData(ctx context.Context, backgroundDataSendWg *sync.WaitGroup) error {
defer backgroundDataSendWg.Done()
if transport.status == Failing {
return nil
}
for {
select {
case <-ctx.Done():
Log.Debug("Invocation context cancelled, not processing any more agent data")
return nil
case agentData := <-transport.dataChannel:
if err := PostToApmServer(ctx, transport, agentData); err != nil {
return fmt.Errorf("error sending to APM server, skipping: %v", err)
}
}
}
}

// FlushAPMData reads all the apm data in the apm data channel and sends it to the APM server.
func FlushAPMData(ctx context.Context, transport *ApmServerTransport) {
jlvoiseux marked this conversation as resolved.
Show resolved Hide resolved
if transport.status == Failing {
Log.Debug("Flush skipped - Transport failing")
return
}
Log.Debug("Flush started - Checking for agent data")
for {
select {
case agentData := <-transport.dataChannel:
Log.Debug("Flush in progress - Processing agent data")
if err := PostToApmServer(ctx, transport, agentData); err != nil {
Log.Errorf("Error sending to APM server, skipping: %v", err)
}
default:
Log.Debug("Flush ended - No agent data on buffer")
return
}
}
}

// PostToApmServer takes a chunk of APM agent data and posts it to the APM server.
//
// The function compresses the APM agent data, if it's not already compressed.
// It sets the APM transport status to failing upon errors, as part of the backoff
// strategy.
func PostToApmServer(client *http.Client, agentData AgentData, config *extensionConfig, ctx context.Context) error {
func PostToApmServer(ctx context.Context, transport *ApmServerTransport, agentData AgentData) error {
jlvoiseux marked this conversation as resolved.
Show resolved Hide resolved
// todo: can this be a streaming or streaming style call that keeps the
// connection open across invocations?
if !IsTransportStatusHealthyOrPending() {
if transport.status == Failing {
return errors.New("transport status is unhealthy")
}

Expand All @@ -83,10 +135,10 @@ func PostToApmServer(client *http.Client, agentData AgentData, config *extension
r = bytes.NewReader(agentData.Data)
} else {
encoding = "gzip"
buf := bufferPool.Get().(*bytes.Buffer)
buf := transport.bufferPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset()
bufferPool.Put(buf)
transport.bufferPool.Put(buf)
}()
gw, err := gzip.NewWriterLevel(buf, gzip.BestSpeed)
if err != nil {
Expand All @@ -101,89 +153,81 @@ func PostToApmServer(client *http.Client, agentData AgentData, config *extension
r = buf
}

req, err := http.NewRequest("POST", config.apmServerUrl+endpointURI, r)
req, err := http.NewRequest("POST", transport.config.apmServerUrl+endpointURI, r)
if err != nil {
return fmt.Errorf("failed to create a new request when posting to APM server: %v", err)
}
req.Header.Add("Content-Encoding", encoding)
req.Header.Add("Content-Type", "application/x-ndjson")
if config.apmServerApiKey != "" {
req.Header.Add("Authorization", "ApiKey "+config.apmServerApiKey)
} else if config.apmServerSecretToken != "" {
req.Header.Add("Authorization", "Bearer "+config.apmServerSecretToken)
if transport.config.apmServerApiKey != "" {
req.Header.Add("Authorization", "ApiKey "+transport.config.apmServerApiKey)
} else if transport.config.apmServerSecretToken != "" {
req.Header.Add("Authorization", "Bearer "+transport.config.apmServerSecretToken)
}

Log.Debug("Sending data chunk to APM Server")
resp, err := client.Do(req)
Log.Debug("Sending data chunk to APM server")
resp, err := transport.client.Do(req)
if err != nil {
SetApmServerTransportState(Failing, ctx)
SetApmServerTransportState(ctx, transport, Failing)
return fmt.Errorf("failed to post to APM server: %v", err)
}

//Read the response body
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
SetApmServerTransportState(Failing, ctx)
SetApmServerTransportState(ctx, transport, Failing)
return fmt.Errorf("failed to read the response body after posting to the APM server")
}

SetApmServerTransportState(Healthy, ctx)
SetApmServerTransportState(ctx, transport, Healthy)
Log.Debug("Transport status set to healthy")
Log.Debugf("APM server response body: %v", string(body))
Log.Debugf("APM server response status code: %v", resp.StatusCode)
return nil
}

// IsTransportStatusHealthyOrPending returns true if the APM server transport status is
// healthy or pending, and false otherwise.
//
// This function is public for use in tests.
func IsTransportStatusHealthyOrPending() bool {
return ApmServerTransportState.Status != Failing
}

// SetApmServerTransportState takes a state of the APM server transport and updates
// the current state of the transport. For a change to a failing state, the grace period
// is calculated and a go routine is started that waits for that period to complete
// before changing the status to "pending". This would allow a subsequent send attempt
// to the APM server.
//
// This function is public for use in tests.
func SetApmServerTransportState(status ApmServerTransportStatusType, ctx context.Context) {
func SetApmServerTransportState(ctx context.Context, transport *ApmServerTransport, status ApmServerTransportStatusType) {
jlvoiseux marked this conversation as resolved.
Show resolved Hide resolved
switch status {
case Healthy:
ApmServerTransportState.Lock()
ApmServerTransportState.Status = status
Log.Debugf("APM Server Transport status set to %s", status)
ApmServerTransportState.ReconnectionCount = -1
ApmServerTransportState.Unlock()
transport.Lock()
transport.status = status
Log.Debugf("APM server Transport status set to %s", transport.status)
transport.reconnectionCount = -1
transport.Unlock()
case Failing:
ApmServerTransportState.Lock()
ApmServerTransportState.Status = status
Log.Debugf("APM Server Transport status set to %s", status)
ApmServerTransportState.ReconnectionCount++
ApmServerTransportState.GracePeriodTimer = time.NewTimer(computeGracePeriod())
Log.Debugf("Grace period entered, reconnection count : %d", ApmServerTransportState.ReconnectionCount)
transport.Lock()
transport.status = status
Log.Debugf("APM server Transport status set to %s", transport.status)
transport.reconnectionCount++
transport.gracePeriodTimer = time.NewTimer(computeGracePeriod(transport))
Log.Debugf("Grace period entered, reconnection count : %d", transport.reconnectionCount)
go func() {
select {
case <-ApmServerTransportState.GracePeriodTimer.C:
case <-transport.gracePeriodTimer.C:
Log.Debug("Grace period over - timer timed out")
case <-ctx.Done():
Log.Debug("Grace period over - context done")
}
ApmServerTransportState.Status = Pending
Log.Debugf("APM Server Transport status set to %s", status)
ApmServerTransportState.Unlock()
transport.status = Pending
Log.Debugf("APM server Transport status set to %s", transport.status)
transport.Unlock()
}()
default:
Log.Errorf("Cannot set APM Server Transport status to %s", status)
Log.Errorf("Cannot set APM server Transport status to %s", status)
}
}

// ComputeGracePeriod https://github.com/elastic/apm/blob/main/specs/agents/transport.md#transport-errors
func computeGracePeriod() time.Duration {
gracePeriodWithoutJitter := math.Pow(math.Min(float64(ApmServerTransportState.ReconnectionCount), 6), 2)
func computeGracePeriod(transport *ApmServerTransport) time.Duration {
jlvoiseux marked this conversation as resolved.
Show resolved Hide resolved
gracePeriodWithoutJitter := math.Pow(math.Min(float64(transport.reconnectionCount), 6), 2)
jitter := rand.Float64()/5 - 0.1
return time.Duration((gracePeriodWithoutJitter + jitter*gracePeriodWithoutJitter) * float64(time.Second))
}
Expand Down
Loading