Skip to content

Commit 8fb326d

Browse files
authored
Merge pull request #168 from deploymenttheory/dev
implemented EvaluateAndAdjustConcurrency metrics decision maker
2 parents 389389d + cce6e69 commit 8fb326d

File tree

4 files changed

+129
-91
lines changed

4 files changed

+129
-91
lines changed

concurrency/metrics.go

Lines changed: 90 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
// concurrency/metrics.go
12
package concurrency
23

34
import (
@@ -9,48 +10,94 @@ import (
910
"go.uber.org/zap"
1011
)
1112

12-
// MonitorRateLimitHeaders monitors the rate limit headers (X-RateLimit-Remaining and Retry-After)
13-
// in the HTTP response and adjusts concurrency accordingly.
14-
// If X-RateLimit-Remaining is below a threshold or Retry-After is specified, decrease concurrency.
15-
// If neither condition is met, consider scaling up if concurrency is below the maximum limit.
16-
// - Threshold for X-RateLimit-Remaining: 10
17-
// - Maximum concurrency: MaxConcurrency
18-
func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) {
19-
// Extract X-RateLimit-Remaining and Retry-After headers from the response
13+
// EvaluateAndAdjustConcurrency evaluates the HTTP response from a server along with the request's response time
14+
// and adjusts the concurrency level of the system accordingly. It utilizes three monitoring functions:
15+
// MonitorRateLimitHeaders, MonitorServerResponseCodes, and MonitorResponseTimeVariability, each of which
16+
// provides feedback on different aspects of the response and system's current state. The function aggregates
17+
// feedback from these monitoring functions to make a decision on whether to scale up or scale down the concurrency.
18+
// The decision is based on a simple majority of suggestions: if more functions suggest scaling down (return -1),
19+
// it scales down; if more suggest scaling up (return 1), it scales up. This method centralizes concurrency control
20+
// decision-making, providing a systematic approach to managing request handling capacity based on real-time
21+
// operational metrics.
22+
//
23+
// Parameters:
24+
//
25+
// resp - The HTTP response received from the server.
26+
// responseTime - The time duration between sending the request and receiving the response.
27+
//
28+
// It logs the specific reason for scaling decisions, helping in traceability and fine-tuning system performance.
29+
func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration) {
30+
// Call monitoring functions
31+
rateLimitFeedback := ch.MonitorRateLimitHeaders(resp)
32+
responseCodeFeedback := ch.MonitorServerResponseCodes(resp)
33+
responseTimeFeedback := ch.MonitorResponseTimeVariability(responseTime)
34+
35+
// Log the feedback from each monitoring function for debugging
36+
ch.logger.Debug("Concurrency Adjustment Feedback",
37+
zap.Int("RateLimitFeedback", rateLimitFeedback),
38+
zap.Int("ResponseCodeFeedback", responseCodeFeedback),
39+
zap.Int("ResponseTimeFeedback", responseTimeFeedback))
40+
41+
// Determine overall action based on feedback
42+
suggestions := []int{rateLimitFeedback, responseCodeFeedback, responseTimeFeedback}
43+
scaleDownCount := 0
44+
scaleUpCount := 0
45+
46+
for _, suggestion := range suggestions {
47+
switch suggestion {
48+
case -1:
49+
scaleDownCount++
50+
case 1:
51+
scaleUpCount++
52+
}
53+
}
54+
55+
// Log the counts for scale down and up suggestions
56+
ch.logger.Info("Scaling Decision Counts",
57+
zap.Int("ScaleDownCount", scaleDownCount),
58+
zap.Int("ScaleUpCount", scaleUpCount))
59+
60+
// Decide on scaling action
61+
if scaleDownCount > scaleUpCount {
62+
ch.logger.Info("Scaling down the concurrency", zap.String("Reason", "More signals suggested to decrease concurrency"))
63+
ch.ScaleDown()
64+
} else if scaleUpCount > scaleDownCount {
65+
ch.logger.Info("Scaling up the concurrency", zap.String("Reason", "More signals suggested to increase concurrency"))
66+
ch.ScaleUp()
67+
} else {
68+
ch.logger.Info("No change in concurrency", zap.String("Reason", "Equal signals for both scaling up and down"))
69+
}
70+
}
71+
72+
// MonitorRateLimitHeaders monitors the rate limit headers in the response and suggests a concurrency adjustment.
73+
func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int {
2074
remaining := resp.Header.Get("X-RateLimit-Remaining")
2175
retryAfter := resp.Header.Get("Retry-After")
76+
suggestion := 0
2277

2378
if remaining != "" {
2479
remainingValue, err := strconv.Atoi(remaining)
2580
if err == nil && remainingValue < 10 {
26-
// Decrease concurrency if X-RateLimit-Remaining is below the threshold
27-
if len(ch.sem) > MinConcurrency {
28-
newSize := len(ch.sem) - 1
29-
ch.logger.Info("Reducing concurrency due to low X-RateLimit-Remaining", zap.Int("NewSize", newSize))
30-
ch.ResizeSemaphore(newSize)
31-
}
81+
// Suggest decrease concurrency if X-RateLimit-Remaining is below the threshold
82+
suggestion = -1
3283
}
3384
}
3485

3586
if retryAfter != "" {
36-
// Decrease concurrency if Retry-After is specified
37-
if len(ch.sem) > MinConcurrency {
38-
newSize := len(ch.sem) - 1
39-
ch.logger.Info("Reducing concurrency due to Retry-After header", zap.Int("NewSize", newSize))
40-
ch.ResizeSemaphore(newSize)
41-
}
87+
// Suggest decrease concurrency if Retry-After is specified
88+
suggestion = -1
4289
} else {
43-
// Scale up if concurrency is below the maximum limit
44-
if len(ch.sem) < MaxConcurrency {
45-
newSize := len(ch.sem) + 1
46-
ch.logger.Info("Increasing concurrency", zap.Int("NewSize", newSize))
47-
ch.ResizeSemaphore(newSize)
90+
// Suggest increase concurrency if currently below maximum limit and no other decrease suggestion has been made
91+
if len(ch.sem) < MaxConcurrency && suggestion == 0 {
92+
suggestion = 1
4893
}
4994
}
95+
96+
return suggestion
5097
}
5198

52-
// MonitorServerResponseCodes monitors server response codes and adjusts concurrency accordingly.
53-
func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) {
99+
// MonitorServerResponseCodes monitors the response status codes and suggests a concurrency adjustment.
100+
func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int {
54101
statusCode := resp.StatusCode
55102

56103
// Lock the metrics to ensure thread safety
@@ -63,7 +110,6 @@ func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) {
63110
ch.Metrics.TotalRateLimitErrors++
64111
case statusCode >= 400 && statusCode < 500:
65112
// Assuming 4xx errors as client errors
66-
// Increase the TotalRetries count to indicate a client error
67113
ch.Metrics.TotalRetries++
68114
}
69115

@@ -75,23 +121,19 @@ func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) {
75121
// Set the new error rate in the metrics
76122
ch.Metrics.ResponseCodeMetrics.ErrorRate = errorRate
77123

78-
// Check if the error rate exceeds the threshold and adjust concurrency accordingly
79-
if errorRate > ErrorRateThreshold && len(ch.sem) > MinConcurrency {
80-
// Decrease concurrency
81-
newSize := len(ch.sem) - 1
82-
ch.logger.Info("Reducing request concurrency due to high error rate", zap.Int("NewSize", newSize))
83-
ch.ResizeSemaphore(newSize)
124+
// Determine action based on the error rate
125+
if errorRate > ErrorRateThreshold {
126+
// Suggest decrease concurrency
127+
return -1
84128
} else if errorRate <= ErrorRateThreshold && len(ch.sem) < MaxConcurrency {
85-
// Scale up if error rate is below the threshold and concurrency is below the maximum limit
86-
newSize := len(ch.sem) + 1
87-
ch.logger.Info("Increasing request concurrency due to low error rate", zap.Int("NewSize", newSize))
88-
ch.ResizeSemaphore(newSize)
129+
// Suggest increase concurrency if there is capacity
130+
return 1
89131
}
132+
return 0
90133
}
91134

92-
// MonitorResponseTimeVariability calculates the standard deviation of response times
93-
// and uses moving averages to smooth out fluctuations, adjusting concurrency accordingly.
94-
func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) {
135+
// MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment.
136+
func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int {
95137
ch.Metrics.Lock.Lock()
96138
defer ch.Metrics.Lock.Unlock()
97139

@@ -110,17 +152,15 @@ func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.D
110152
// Calculate standard deviation of response times
111153
stdDev := math.Sqrt(ch.Metrics.ResponseTimeVariability.Variance)
112154

113-
// Adjust concurrency based on response time variability
114-
if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) > MinConcurrency {
115-
newSize := len(ch.sem) - 1
116-
ch.logger.Info("Reducing request concurrency due to high response time variability", zap.Int("NewSize", newSize))
117-
ch.ResizeSemaphore(newSize)
155+
// Determine action based on standard deviation
156+
if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold {
157+
// Suggest decrease concurrency
158+
return -1
118159
} else if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < MaxConcurrency {
119-
// Scale up if response time variability is below the threshold and concurrency is below the maximum limit
120-
newSize := len(ch.sem) + 1
121-
ch.logger.Info("Increasing request concurrency due to low response time variability", zap.Int("NewSize", newSize))
122-
ch.ResizeSemaphore(newSize)
160+
// Suggest increase concurrency if there is capacity
161+
return 1
123162
}
163+
return 0
124164
}
125165

126166
// calculateVariance calculates the variance of response times.
@@ -134,35 +174,3 @@ func (ch *ConcurrencyHandler) calculateVariance(averageResponseTime time.Duratio
134174
ch.Metrics.ResponseTimeVariability.Variance = variance
135175
return variance
136176
}
137-
138-
// MonitorNetworkLatency measures Time to First Byte (TTFB) and monitors network throughput,
139-
// adjusting concurrency based on changes in network latency and throughput.
140-
func (ch *ConcurrencyHandler) MonitorNetworkLatency(ttfb time.Duration, throughput float64) {
141-
ch.Metrics.Lock.Lock()
142-
defer ch.Metrics.Lock.Unlock()
143-
144-
// Calculate the TTFB moving average
145-
ch.Metrics.TTFB.Lock.Lock()
146-
defer ch.Metrics.TTFB.Lock.Unlock()
147-
ch.Metrics.TTFB.Total += ttfb
148-
ch.Metrics.TTFB.Count++
149-
ttfbMovingAverage := ch.Metrics.TTFB.Total / time.Duration(ch.Metrics.TTFB.Count)
150-
151-
// Calculate the throughput moving average
152-
ch.Metrics.Throughput.Lock.Lock()
153-
defer ch.Metrics.Throughput.Lock.Unlock()
154-
ch.Metrics.Throughput.Total += throughput
155-
ch.Metrics.Throughput.Count++
156-
throughputMovingAverage := ch.Metrics.Throughput.Total / float64(ch.Metrics.Throughput.Count)
157-
158-
// Adjust concurrency based on TTFB and throughput moving averages
159-
if ttfbMovingAverage > MaxAcceptableTTFB && len(ch.sem) > MinConcurrency {
160-
newSize := len(ch.sem) - 1
161-
ch.logger.Info("Reducing request concurrency due to high TTFB", zap.Int("NewSize", newSize))
162-
ch.ResizeSemaphore(newSize)
163-
} else if throughputMovingAverage > MaxAcceptableThroughput && len(ch.sem) < MaxConcurrency {
164-
newSize := len(ch.sem) + 1
165-
ch.logger.Info("Increasing request concurrency due to high throughput", zap.Int("NewSize", newSize))
166-
ch.ResizeSemaphore(newSize)
167-
}
168-
}

concurrency/resize.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
// concurrency/resize.go
2-
32
package concurrency
43

54
// ResizeSemaphore adjusts the size of the semaphore used to control concurrency. This method creates a new

concurrency/scale.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// concurrency/scale.go
2+
package concurrency
3+
4+
import "go.uber.org/zap"
5+
6+
// ScaleDown reduces the concurrency level by one, down to the minimum limit.
7+
func (ch *ConcurrencyHandler) ScaleDown() {
8+
// Lock to ensure thread safety
9+
ch.lock.Lock()
10+
defer ch.lock.Unlock()
11+
12+
// We must consider the capacity rather than the length of the semaphore channel
13+
currentSize := cap(ch.sem)
14+
if currentSize > MinConcurrency {
15+
newSize := currentSize - 1
16+
ch.logger.Info("Reducing request concurrency", zap.Int("currentSize", currentSize), zap.Int("newSize", newSize))
17+
ch.ResizeSemaphore(newSize)
18+
} else {
19+
ch.logger.Info("Concurrency already at minimum level; cannot reduce further", zap.Int("currentSize", currentSize))
20+
}
21+
}
22+
23+
// ScaleUp increases the concurrency level by one, up to the maximum limit.
24+
func (ch *ConcurrencyHandler) ScaleUp() {
25+
// Lock to ensure thread safety
26+
ch.lock.Lock()
27+
defer ch.lock.Unlock()
28+
29+
currentSize := cap(ch.sem)
30+
if currentSize < MaxConcurrency {
31+
newSize := currentSize + 1
32+
ch.logger.Info("Increasing request concurrency", zap.Int("currentSize", currentSize), zap.Int("newSize", newSize))
33+
ch.ResizeSemaphore(newSize)
34+
} else {
35+
ch.logger.Info("Concurrency already at maximum level; cannot increase further", zap.Int("currentSize", currentSize))
36+
}
37+
}

httpclient/request.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -342,14 +342,8 @@ func (c *Client) executeRequest(method, endpoint string, body, out interface{})
342342
// Calculate the duration between sending the request and receiving the response
343343
duration := time.Since(startTime)
344344

345-
// Monitor response time variability
346-
c.ConcurrencyHandler.MonitorResponseTimeVariability(duration)
347-
348-
// Monitor server response codes
349-
c.ConcurrencyHandler.MonitorServerResponseCodes(resp)
350-
351-
// Monitor rate limit headers
352-
c.ConcurrencyHandler.MonitorRateLimitHeaders(resp)
345+
// Evaluate and adjust concurrency based on the request's feedback
346+
c.ConcurrencyHandler.EvaluateAndAdjustConcurrency(resp, duration)
353347

354348
// Log outgoing cookies
355349
log.LogCookies("incoming", req, method, endpoint)

0 commit comments

Comments
 (0)