Skip to content

Commit 43ca282

Browse files
committed
Refactor Transport State Struct
1 parent f4b0759 commit 43ca282

File tree

10 files changed

+197
-203
lines changed

10 files changed

+197
-203
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
RUN_E2E_TESTS=false
1+
RUN_E2E_TESTS=true
22
ELASTIC_APM_LOG_LEVEL=info

apm-lambda-extension/extension/apm_server.go

Lines changed: 54 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ var bufferPool = sync.Pool{New: func() interface{} {
3636
return &bytes.Buffer{}
3737
}}
3838

39-
type ApmServerTransportStatusType string
40-
4139
// Constants for the state of the transport used in
4240
// the backoff implementation.
41+
type ApmServerTransportStatusType string
42+
4343
const (
4444
Failing ApmServerTransportStatusType = "Failing"
4545
Pending ApmServerTransportStatusType = "Pending"
@@ -48,34 +48,45 @@ const (
4848

4949
// A struct to track the state and status of sending
5050
// to the APM server. Used in the backoff implementation.
51-
type ApmServerTransportStateType struct {
51+
type ApmServerTransport struct {
52+
sync.Pool
5253
sync.Mutex
54+
ctx context.Context
55+
config *extensionConfig
56+
DataChannel chan AgentData
57+
Client *http.Client
5358
Status ApmServerTransportStatusType
5459
ReconnectionCount int
5560
GracePeriodTimer *time.Timer
5661
}
5762

58-
// The status of transport to the APM server.
59-
//
60-
// This instance of the ApmServerTransportStateType is public for use in tests.
61-
var ApmServerTransportState = ApmServerTransportStateType{
62-
Status: Healthy,
63-
ReconnectionCount: -1,
63+
func InitApmServerTransport(ctx context.Context, config *extensionConfig) *ApmServerTransport {
64+
var transport ApmServerTransport
65+
transport.DataChannel = make(chan AgentData, 100)
66+
transport.Client = &http.Client{
67+
Timeout: time.Duration(config.DataForwarderTimeoutSeconds) * time.Second,
68+
Transport: http.DefaultTransport.(*http.Transport).Clone(),
69+
}
70+
transport.config = config
71+
transport.ctx = ctx
72+
transport.Status = Healthy
73+
transport.ReconnectionCount = -1
74+
return &transport
6475
}
6576

66-
func StartBackgroundSending(ctx context.Context, agentDataChannel chan AgentData, client *http.Client, config *extensionConfig, funcDone chan struct{}, backgroundDataSendWg *sync.WaitGroup) {
77+
func StartBackgroundSending(transport *ApmServerTransport, funcDone chan struct{}, backgroundDataSendWg *sync.WaitGroup) {
6778
go func() {
6879
defer backgroundDataSendWg.Done()
69-
if !IsTransportStatusHealthyOrPending() {
80+
if transport.Status == Failing {
7081
return
7182
}
7283
for {
7384
select {
7485
case <-funcDone:
7586
Log.Debug("Received signal that function has completed, not processing any more agent data")
7687
return
77-
case agentData := <-agentDataChannel:
78-
if err := PostToApmServer(client, agentData, config, ctx); err != nil {
88+
case agentData := <-transport.DataChannel:
89+
if err := PostToApmServer(transport, agentData); err != nil {
7990
Log.Errorf("Error sending to APM server, skipping: %v", err)
8091
return
8192
}
@@ -89,10 +100,10 @@ func StartBackgroundSending(ctx context.Context, agentDataChannel chan AgentData
89100
// The function compresses the APM agent data, if it's not already compressed.
90101
// It sets the APM transport status to failing upon errors, as part of the backoff
91102
// strategy.
92-
func PostToApmServer(client *http.Client, agentData AgentData, config *extensionConfig, ctx context.Context) error {
103+
func PostToApmServer(transport *ApmServerTransport, agentData AgentData) error {
93104
// todo: can this be a streaming or streaming style call that keeps the
94105
// connection open across invocations?
95-
if !IsTransportStatusHealthyOrPending() {
106+
if transport.Status == Failing {
96107
return errors.New("transport status is unhealthy")
97108
}
98109

@@ -122,89 +133,81 @@ func PostToApmServer(client *http.Client, agentData AgentData, config *extension
122133
r = buf
123134
}
124135

125-
req, err := http.NewRequest("POST", config.apmServerUrl+endpointURI, r)
136+
req, err := http.NewRequest("POST", transport.config.apmServerUrl+endpointURI, r)
126137
if err != nil {
127138
return fmt.Errorf("failed to create a new request when posting to APM server: %v", err)
128139
}
129140
req.Header.Add("Content-Encoding", encoding)
130141
req.Header.Add("Content-Type", "application/x-ndjson")
131-
if config.apmServerApiKey != "" {
132-
req.Header.Add("Authorization", "ApiKey "+config.apmServerApiKey)
133-
} else if config.apmServerSecretToken != "" {
134-
req.Header.Add("Authorization", "Bearer "+config.apmServerSecretToken)
142+
if transport.config.apmServerApiKey != "" {
143+
req.Header.Add("Authorization", "ApiKey "+transport.config.apmServerApiKey)
144+
} else if transport.config.apmServerSecretToken != "" {
145+
req.Header.Add("Authorization", "Bearer "+transport.config.apmServerSecretToken)
135146
}
136147

137148
Log.Debug("Sending data chunk to APM Server")
138-
resp, err := client.Do(req)
149+
resp, err := transport.Client.Do(req)
139150
if err != nil {
140-
SetApmServerTransportState(Failing, ctx)
151+
SetApmServerTransportState(transport, Failing)
141152
return fmt.Errorf("failed to post to APM server: %v", err)
142153
}
143154

144155
//Read the response body
145156
defer resp.Body.Close()
146157
body, err := ioutil.ReadAll(resp.Body)
147158
if err != nil {
148-
SetApmServerTransportState(Failing, ctx)
159+
SetApmServerTransportState(transport, Failing)
149160
return fmt.Errorf("failed to read the response body after posting to the APM server")
150161
}
151162

152-
SetApmServerTransportState(Healthy, ctx)
163+
SetApmServerTransportState(transport, Healthy)
153164
Log.Debug("Transport status set to healthy")
154165
Log.Debugf("APM server response body: %v", string(body))
155166
Log.Debugf("APM server response status code: %v", resp.StatusCode)
156167
return nil
157168
}
158169

159-
// IsTransportStatusHealthyOrPending returns true if the APM server transport status is
160-
// healthy or pending, and false otherwise.
161-
//
162-
// This function is public for use in tests.
163-
func IsTransportStatusHealthyOrPending() bool {
164-
return ApmServerTransportState.Status != Failing
165-
}
166-
167170
// SetApmServerTransportState takes a state of the APM server transport and updates
168171
// the current state of the transport. For a change to a failing state, the grace period
169172
// is calculated and a go routine is started that waits for that period to complete
170173
// before changing the status to "pending". This would allow a subsequent send attempt
171174
// to the APM server.
172175
//
173176
// This function is public for use in tests.
174-
func SetApmServerTransportState(status ApmServerTransportStatusType, ctx context.Context) {
177+
func SetApmServerTransportState(transport *ApmServerTransport, status ApmServerTransportStatusType) {
175178
switch status {
176179
case Healthy:
177-
ApmServerTransportState.Lock()
178-
ApmServerTransportState.Status = status
179-
Log.Debugf("APM Server Transport status set to %s", status)
180-
ApmServerTransportState.ReconnectionCount = -1
181-
ApmServerTransportState.Unlock()
180+
transport.Lock()
181+
transport.Status = status
182+
Log.Debugf("APM Server Transport status set to %s", transport.Status)
183+
transport.ReconnectionCount = -1
184+
transport.Unlock()
182185
case Failing:
183-
ApmServerTransportState.Lock()
184-
ApmServerTransportState.Status = status
185-
Log.Debugf("APM Server Transport status set to %s", status)
186-
ApmServerTransportState.ReconnectionCount++
187-
ApmServerTransportState.GracePeriodTimer = time.NewTimer(computeGracePeriod())
188-
Log.Debugf("Grace period entered, reconnection count : %d", ApmServerTransportState.ReconnectionCount)
186+
transport.Lock()
187+
transport.Status = status
188+
Log.Debugf("APM Server Transport status set to %s", transport.Status)
189+
transport.ReconnectionCount++
190+
transport.GracePeriodTimer = time.NewTimer(computeGracePeriod(transport))
191+
Log.Debugf("Grace period entered, reconnection count : %d", transport.ReconnectionCount)
189192
go func() {
190193
select {
191-
case <-ApmServerTransportState.GracePeriodTimer.C:
194+
case <-transport.GracePeriodTimer.C:
192195
Log.Debug("Grace period over - timer timed out")
193-
case <-ctx.Done():
196+
case <-transport.ctx.Done():
194197
Log.Debug("Grace period over - context done")
195198
}
196-
ApmServerTransportState.Status = Pending
197-
Log.Debugf("APM Server Transport status set to %s", status)
198-
ApmServerTransportState.Unlock()
199+
transport.Status = Pending
200+
Log.Debugf("APM Server Transport status set to %s", transport.Status)
201+
transport.Unlock()
199202
}()
200203
default:
201204
Log.Errorf("Cannot set APM Server Transport status to %s", status)
202205
}
203206
}
204207

205208
// ComputeGracePeriod https://github.com/elastic/apm/blob/main/specs/agents/transport.md#transport-errors
206-
func computeGracePeriod() time.Duration {
207-
gracePeriodWithoutJitter := math.Pow(math.Min(float64(ApmServerTransportState.ReconnectionCount), 6), 2)
209+
func computeGracePeriod(transport *ApmServerTransport) time.Duration {
210+
gracePeriodWithoutJitter := math.Pow(math.Min(float64(transport.ReconnectionCount), 6), 2)
208211
jitter := rand.Float64()/5 - 0.1
209212
return time.Duration((gracePeriodWithoutJitter + jitter*gracePeriodWithoutJitter) * float64(time.Second))
210213
}

0 commit comments

Comments
 (0)