Skip to content

Commit f3c8c57

Browse files
committed
Refactor concurrency package to include new metrics and remove unused code
1 parent 1d54732 commit f3c8c57

File tree

3 files changed

+97
-92
lines changed

3 files changed

+97
-92
lines changed

concurrency/metrics.go

Lines changed: 59 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -5,52 +5,66 @@ import (
55
"net/http"
66
"strconv"
77
"time"
8-
9-
"go.uber.org/zap"
108
)
119

12-
// MonitorRateLimitHeaders monitors the rate limit headers (X-RateLimit-Remaining and Retry-After)
13-
// in the HTTP response and adjusts concurrency accordingly.
14-
// If X-RateLimit-Remaining is below a threshold or Retry-After is specified, decrease concurrency.
15-
// If neither condition is met, consider scaling up if concurrency is below the maximum limit.
16-
// - Threshold for X-RateLimit-Remaining: 10
17-
// - Maximum concurrency: MaxConcurrency
18-
func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) {
19-
// Extract X-RateLimit-Remaining and Retry-After headers from the response
10+
// EvaluateAndAdjustConcurrency evaluates the response from the server and adjusts the concurrency level accordingly.
11+
func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration) {
12+
// Call monitoring functions
13+
rateLimitFeedback := ch.MonitorRateLimitHeaders(resp)
14+
responseCodeFeedback := ch.MonitorServerResponseCodes(resp)
15+
responseTimeFeedback := ch.MonitorResponseTimeVariability(responseTime)
16+
17+
// Determine overall action based on feedback
18+
suggestions := []int{rateLimitFeedback, responseCodeFeedback, responseTimeFeedback}
19+
scaleDownCount := 0
20+
scaleUpCount := 0
21+
22+
for _, suggestion := range suggestions {
23+
switch suggestion {
24+
case -1:
25+
scaleDownCount++
26+
case 1:
27+
scaleUpCount++
28+
}
29+
}
30+
31+
// Decide on scaling action
32+
if scaleDownCount > scaleUpCount {
33+
ch.ScaleDown()
34+
} else if scaleUpCount > scaleDownCount {
35+
ch.ScaleUp()
36+
}
37+
}
38+
39+
// MonitorRateLimitHeaders monitors the rate limit headers in the response and suggests a concurrency adjustment.
40+
func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int {
2041
remaining := resp.Header.Get("X-RateLimit-Remaining")
2142
retryAfter := resp.Header.Get("Retry-After")
43+
suggestion := 0
2244

2345
if remaining != "" {
2446
remainingValue, err := strconv.Atoi(remaining)
2547
if err == nil && remainingValue < 10 {
26-
// Decrease concurrency if X-RateLimit-Remaining is below the threshold
27-
if len(ch.sem) > MinConcurrency {
28-
newSize := len(ch.sem) - 1
29-
ch.logger.Info("Reducing concurrency due to low X-RateLimit-Remaining", zap.Int("NewSize", newSize))
30-
ch.ResizeSemaphore(newSize)
31-
}
48+
// Suggest decrease concurrency if X-RateLimit-Remaining is below the threshold
49+
suggestion = -1
3250
}
3351
}
3452

3553
if retryAfter != "" {
36-
// Decrease concurrency if Retry-After is specified
37-
if len(ch.sem) > MinConcurrency {
38-
newSize := len(ch.sem) - 1
39-
ch.logger.Info("Reducing concurrency due to Retry-After header", zap.Int("NewSize", newSize))
40-
ch.ResizeSemaphore(newSize)
41-
}
54+
// Suggest decrease concurrency if Retry-After is specified
55+
suggestion = -1
4256
} else {
43-
// Scale up if concurrency is below the maximum limit
44-
if len(ch.sem) < MaxConcurrency {
45-
newSize := len(ch.sem) + 1
46-
ch.logger.Info("Increasing concurrency", zap.Int("NewSize", newSize))
47-
ch.ResizeSemaphore(newSize)
57+
// Suggest increase concurrency if currently below maximum limit and no other decrease suggestion has been made
58+
if len(ch.sem) < MaxConcurrency && suggestion == 0 {
59+
suggestion = 1
4860
}
4961
}
62+
63+
return suggestion
5064
}
5165

52-
// MonitorServerResponseCodes monitors server response codes and adjusts concurrency accordingly.
53-
func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) {
66+
// MonitorServerResponseCodes monitors the response status codes and suggests a concurrency adjustment.
67+
func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int {
5468
statusCode := resp.StatusCode
5569

5670
// Lock the metrics to ensure thread safety
@@ -63,7 +77,6 @@ func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) {
6377
ch.Metrics.TotalRateLimitErrors++
6478
case statusCode >= 400 && statusCode < 500:
6579
// Assuming 4xx errors as client errors
66-
// Increase the TotalRetries count to indicate a client error
6780
ch.Metrics.TotalRetries++
6881
}
6982

@@ -75,23 +88,19 @@ func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) {
7588
// Set the new error rate in the metrics
7689
ch.Metrics.ResponseCodeMetrics.ErrorRate = errorRate
7790

78-
// Check if the error rate exceeds the threshold and adjust concurrency accordingly
79-
if errorRate > ErrorRateThreshold && len(ch.sem) > MinConcurrency {
80-
// Decrease concurrency
81-
newSize := len(ch.sem) - 1
82-
ch.logger.Info("Reducing request concurrency due to high error rate", zap.Int("NewSize", newSize))
83-
ch.ResizeSemaphore(newSize)
91+
// Determine action based on the error rate
92+
if errorRate > ErrorRateThreshold {
93+
// Suggest decrease concurrency
94+
return -1
8495
} else if errorRate <= ErrorRateThreshold && len(ch.sem) < MaxConcurrency {
85-
// Scale up if error rate is below the threshold and concurrency is below the maximum limit
86-
newSize := len(ch.sem) + 1
87-
ch.logger.Info("Increasing request concurrency due to low error rate", zap.Int("NewSize", newSize))
88-
ch.ResizeSemaphore(newSize)
96+
// Suggest increase concurrency if there is capacity
97+
return 1
8998
}
99+
return 0
90100
}
91101

92-
// MonitorResponseTimeVariability calculates the standard deviation of response times
93-
// and uses moving averages to smooth out fluctuations, adjusting concurrency accordingly.
94-
func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) {
102+
// MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment.
103+
func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int {
95104
ch.Metrics.Lock.Lock()
96105
defer ch.Metrics.Lock.Unlock()
97106

@@ -110,17 +119,15 @@ func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.D
110119
// Calculate standard deviation of response times
111120
stdDev := math.Sqrt(ch.Metrics.ResponseTimeVariability.Variance)
112121

113-
// Adjust concurrency based on response time variability
114-
if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) > MinConcurrency {
115-
newSize := len(ch.sem) - 1
116-
ch.logger.Info("Reducing request concurrency due to high response time variability", zap.Int("NewSize", newSize))
117-
ch.ResizeSemaphore(newSize)
122+
// Determine action based on standard deviation
123+
if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold {
124+
// Suggest decrease concurrency
125+
return -1
118126
} else if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < MaxConcurrency {
119-
// Scale up if response time variability is below the threshold and concurrency is below the maximum limit
120-
newSize := len(ch.sem) + 1
121-
ch.logger.Info("Increasing request concurrency due to low response time variability", zap.Int("NewSize", newSize))
122-
ch.ResizeSemaphore(newSize)
127+
// Suggest increase concurrency if there is capacity
128+
return 1
123129
}
130+
return 0
124131
}
125132

126133
// calculateVariance calculates the variance of response times.
@@ -134,35 +141,3 @@ func (ch *ConcurrencyHandler) calculateVariance(averageResponseTime time.Duratio
134141
ch.Metrics.ResponseTimeVariability.Variance = variance
135142
return variance
136143
}
137-
138-
// MonitorNetworkLatency measures Time to First Byte (TTFB) and monitors network throughput,
139-
// adjusting concurrency based on changes in network latency and throughput.
140-
func (ch *ConcurrencyHandler) MonitorNetworkLatency(ttfb time.Duration, throughput float64) {
141-
ch.Metrics.Lock.Lock()
142-
defer ch.Metrics.Lock.Unlock()
143-
144-
// Calculate the TTFB moving average
145-
ch.Metrics.TTFB.Lock.Lock()
146-
defer ch.Metrics.TTFB.Lock.Unlock()
147-
ch.Metrics.TTFB.Total += ttfb
148-
ch.Metrics.TTFB.Count++
149-
ttfbMovingAverage := ch.Metrics.TTFB.Total / time.Duration(ch.Metrics.TTFB.Count)
150-
151-
// Calculate the throughput moving average
152-
ch.Metrics.Throughput.Lock.Lock()
153-
defer ch.Metrics.Throughput.Lock.Unlock()
154-
ch.Metrics.Throughput.Total += throughput
155-
ch.Metrics.Throughput.Count++
156-
throughputMovingAverage := ch.Metrics.Throughput.Total / float64(ch.Metrics.Throughput.Count)
157-
158-
// Adjust concurrency based on TTFB and throughput moving averages
159-
if ttfbMovingAverage > MaxAcceptableTTFB && len(ch.sem) > MinConcurrency {
160-
newSize := len(ch.sem) - 1
161-
ch.logger.Info("Reducing request concurrency due to high TTFB", zap.Int("NewSize", newSize))
162-
ch.ResizeSemaphore(newSize)
163-
} else if throughputMovingAverage > MaxAcceptableThroughput && len(ch.sem) < MaxConcurrency {
164-
newSize := len(ch.sem) + 1
165-
ch.logger.Info("Increasing request concurrency due to high throughput", zap.Int("NewSize", newSize))
166-
ch.ResizeSemaphore(newSize)
167-
}
168-
}

concurrency/scale.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package concurrency
2+
3+
import "go.uber.org/zap"
4+
5+
// ScaleDown reduces the concurrency level by one, down to the minimum limit.
6+
func (ch *ConcurrencyHandler) ScaleDown() {
7+
// Lock to ensure thread safety
8+
ch.lock.Lock()
9+
defer ch.lock.Unlock()
10+
11+
// We must consider the capacity rather than the length of the semaphore channel
12+
currentSize := cap(ch.sem)
13+
if currentSize > MinConcurrency {
14+
newSize := currentSize - 1
15+
ch.logger.Info("Reducing request concurrency", zap.Int("currentSize", currentSize), zap.Int("newSize", newSize))
16+
ch.ResizeSemaphore(newSize)
17+
} else {
18+
ch.logger.Info("Concurrency already at minimum level; cannot reduce further", zap.Int("currentSize", currentSize))
19+
}
20+
}
21+
22+
// ScaleUp increases the concurrency level by one, up to the maximum limit.
23+
func (ch *ConcurrencyHandler) ScaleUp() {
24+
// Lock to ensure thread safety
25+
ch.lock.Lock()
26+
defer ch.lock.Unlock()
27+
28+
currentSize := cap(ch.sem)
29+
if currentSize < MaxConcurrency {
30+
newSize := currentSize + 1
31+
ch.logger.Info("Increasing request concurrency", zap.Int("currentSize", currentSize), zap.Int("newSize", newSize))
32+
ch.ResizeSemaphore(newSize)
33+
} else {
34+
ch.logger.Info("Concurrency already at maximum level; cannot increase further", zap.Int("currentSize", currentSize))
35+
}
36+
}

httpclient/request.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -342,14 +342,8 @@ func (c *Client) executeRequest(method, endpoint string, body, out interface{})
342342
// Calculate the duration between sending the request and receiving the response
343343
duration := time.Since(startTime)
344344

345-
// Monitor response time variability
346-
c.ConcurrencyHandler.MonitorResponseTimeVariability(duration)
347-
348-
// Monitor server response codes
349-
c.ConcurrencyHandler.MonitorServerResponseCodes(resp)
350-
351-
// Monitor rate limit headers
352-
c.ConcurrencyHandler.MonitorRateLimitHeaders(resp)
345+
// Evaluate and adjust concurrency based on the request's feedback
346+
c.ConcurrencyHandler.EvaluateAndAdjustConcurrency(resp, duration)
353347

354348
// Log outgoing cookies
355349
log.LogCookies("incoming", req, method, endpoint)

0 commit comments

Comments
 (0)