Skip to content

Commit 3668635

Browse files
committed
Refactor ConcurrencyMetrics struct in handler.go to include additional metrics and locks
1 parent 011667b commit 3668635

File tree

5 files changed

+145
-94
lines changed

5 files changed

+145
-94
lines changed

concurrency/adjust_concurrency.go

Lines changed: 0 additions & 70 deletions
This file was deleted.

concurrency/const.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
// concurrency/const.go
12
package concurrency
23

34
import "time"
@@ -16,4 +17,9 @@ const (
1617
// MaxAcceptableResponseTimeVariability represents the maximum acceptable variability in response times.
1718
// It is used as a threshold to dynamically adjust concurrency based on fluctuations in response times.
1819
MaxAcceptableResponseTimeVariability = 500 * time.Millisecond
20+
21+
// ErrorRateThreshold represents the threshold for error rate above which concurrency will be adjusted.
22+
// Error rate is calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests.
23+
// Adjustments in concurrency will be made if the error rate exceeds this threshold. A threshold of 0.1 (or 10%) is common.
24+
ErrorRateThreshold = 0.1
1925
)

concurrency/handler.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,11 @@ type ConcurrencyHandler struct {
2525
Metrics *ConcurrencyMetrics
2626
}
2727

28-
// ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.// ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.
28+
// ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.
2929
type ConcurrencyMetrics struct {
3030
TotalRequests int64 // Total number of requests made
3131
TotalRetries int64 // Total number of retry attempts
3232
TotalRateLimitErrors int64 // Total number of rate limit errors encountered
33-
TotalResponseTime time.Duration // Total response time for all requests
34-
AverageResponseTime time.Duration // Average response time across all requests
35-
ErrorRate float64 // Error rate calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests
3633
TokenWaitTime time.Duration // Total time spent waiting for tokens
3734
TTFB struct { // Metrics related to Time to First Byte (TTFB)
3835
Total time.Duration // Total Time to First Byte (TTFB) for all requests
@@ -42,11 +39,21 @@ type ConcurrencyMetrics struct {
4239
Throughput struct { // Metrics related to network throughput
4340
Total float64 // Total network throughput for all requests
4441
Count int64 // Count of requests used for calculating throughput
45-
Lock sync.Mutex // Lock for throughput metrics
42+
Lock sync.Mutex // Lock for throughput metrics/
4643
}
47-
Variance float64 // Variance of response times
48-
ResponseCount int64 // Count of responses used for calculating response time variability
49-
Lock sync.Mutex // Lock for overall metrics fields
44+
ResponseTimeVariability struct { // Metrics related to response time variability
45+
Total time.Duration // Total response time for all requests
46+
Average time.Duration // Average response time across all requests
47+
Variance float64 // Variance of response times
48+
Count int64 // Count of responses used for calculating response time variability
49+
Lock sync.Mutex // Lock for response time variability metrics
50+
StdDevThreshold float64 // Maximum acceptable standard deviation for adjusting concurrency
51+
}
52+
ResponseCodeMetrics struct {
53+
ErrorRate float64 // Error rate calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests
54+
Lock sync.Mutex // Lock for response code metrics
55+
}
56+
Lock sync.Mutex // Lock for overall metrics fields
5057
}
5158

5259
// NewConcurrencyHandler initializes a new ConcurrencyHandler with the given

concurrency/metrics.go

Lines changed: 88 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,90 @@ package concurrency
33
import (
44
"math"
55
"net/http"
6+
"strconv"
67
"time"
78

89
"go.uber.org/zap"
910
)
1011

1112
// MonitorRateLimitHeaders monitors the rate limit headers (X-RateLimit-Remaining and Retry-After)
1213
// in the HTTP response and adjusts concurrency accordingly.
14+
// If X-RateLimit-Remaining is below a threshold or Retry-After is specified, decrease concurrency.
15+
// If neither condition is met, consider scaling up if concurrency is below the maximum limit.
16+
// - Threshold for X-RateLimit-Remaining: 10
17+
// - Maximum concurrency: MaxConcurrency
1318
func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) {
1419
// Extract X-RateLimit-Remaining and Retry-After headers from the response
1520
remaining := resp.Header.Get("X-RateLimit-Remaining")
1621
retryAfter := resp.Header.Get("Retry-After")
1722

18-
// Adjust concurrency based on the values of these headers
19-
// Implement your logic here to dynamically adjust concurrency
23+
if remaining != "" {
24+
remainingValue, err := strconv.Atoi(remaining)
25+
if err == nil && remainingValue < 10 {
26+
// Decrease concurrency if X-RateLimit-Remaining is below the threshold
27+
if len(ch.sem) > MinConcurrency {
28+
newSize := len(ch.sem) - 1
29+
ch.logger.Info("Reducing concurrency due to low X-RateLimit-Remaining", zap.Int("NewSize", newSize))
30+
ch.ResizeSemaphore(newSize)
31+
}
32+
}
33+
}
34+
35+
if retryAfter != "" {
36+
// Decrease concurrency if Retry-After is specified
37+
if len(ch.sem) > MinConcurrency {
38+
newSize := len(ch.sem) - 1
39+
ch.logger.Info("Reducing concurrency due to Retry-After header", zap.Int("NewSize", newSize))
40+
ch.ResizeSemaphore(newSize)
41+
}
42+
} else {
43+
// Scale up if concurrency is below the maximum limit
44+
if len(ch.sem) < MaxConcurrency {
45+
newSize := len(ch.sem) + 1
46+
ch.logger.Info("Increasing concurrency", zap.Int("NewSize", newSize))
47+
ch.ResizeSemaphore(newSize)
48+
}
49+
}
2050
}
2151

2252
// MonitorServerResponseCodes monitors server response codes and adjusts concurrency accordingly.
2353
func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) {
2454
statusCode := resp.StatusCode
25-
// Check for 5xx errors (server errors) and 4xx errors (client errors)
26-
// Implement your logic here to track increases in error rates and adjust concurrency
55+
56+
// Lock the metrics to ensure thread safety
57+
ch.Metrics.Lock.Lock()
58+
defer ch.Metrics.Lock.Unlock()
59+
60+
// Update the appropriate error count based on the response status code
61+
switch {
62+
case statusCode >= 500 && statusCode < 600:
63+
ch.Metrics.TotalRateLimitErrors++
64+
case statusCode >= 400 && statusCode < 500:
65+
// Assuming 4xx errors as client errors
66+
// Increase the TotalRetries count to indicate a client error
67+
ch.Metrics.TotalRetries++
68+
}
69+
70+
// Calculate error rate
71+
totalRequests := float64(ch.Metrics.TotalRequests)
72+
totalErrors := float64(ch.Metrics.TotalRateLimitErrors + ch.Metrics.TotalRetries)
73+
errorRate := totalErrors / totalRequests
74+
75+
// Set the new error rate in the metrics
76+
ch.Metrics.ResponseCodeMetrics.ErrorRate = errorRate
77+
78+
// Check if the error rate exceeds the threshold and adjust concurrency accordingly
79+
if errorRate > ErrorRateThreshold && len(ch.sem) > MinConcurrency {
80+
// Decrease concurrency
81+
newSize := len(ch.sem) - 1
82+
ch.logger.Info("Reducing request concurrency due to high error rate", zap.Int("NewSize", newSize))
83+
ch.ResizeSemaphore(newSize)
84+
} else if errorRate <= ErrorRateThreshold && len(ch.sem) < MaxConcurrency {
85+
// Scale up if error rate is below the threshold and concurrency is below the maximum limit
86+
newSize := len(ch.sem) + 1
87+
ch.logger.Info("Increasing request concurrency due to low error rate", zap.Int("NewSize", newSize))
88+
ch.ResizeSemaphore(newSize)
89+
}
2790
}
2891

2992
// MonitorResponseTimeVariability calculates the standard deviation of response times
@@ -32,21 +95,30 @@ func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.D
3295
ch.Metrics.Lock.Lock()
3396
defer ch.Metrics.Lock.Unlock()
3497

35-
// Update TotalResponseTime and ResponseCount for moving average calculation
36-
ch.Metrics.TotalResponseTime += responseTime
37-
ch.Metrics.ResponseCount++
98+
// Update ResponseTimeVariability metrics
99+
ch.Metrics.ResponseTimeVariability.Lock.Lock()
100+
defer ch.Metrics.ResponseTimeVariability.Lock.Unlock()
101+
ch.Metrics.ResponseTimeVariability.Total += responseTime
102+
ch.Metrics.ResponseTimeVariability.Count++
38103

39104
// Calculate average response time
40-
averageResponseTime := ch.Metrics.TotalResponseTime / time.Duration(ch.Metrics.ResponseCount)
105+
ch.Metrics.ResponseTimeVariability.Average = ch.Metrics.ResponseTimeVariability.Total / time.Duration(ch.Metrics.ResponseTimeVariability.Count)
106+
107+
// Calculate variance of response times
108+
ch.Metrics.ResponseTimeVariability.Variance = ch.calculateVariance(ch.Metrics.ResponseTimeVariability.Average, responseTime)
41109

42110
// Calculate standard deviation of response times
43-
variance := ch.calculateVariance(averageResponseTime, responseTime)
44-
stdDev := math.Sqrt(variance)
111+
stdDev := math.Sqrt(ch.Metrics.ResponseTimeVariability.Variance)
45112

46113
// Adjust concurrency based on response time variability
47-
if float64(stdDev) > MaxAcceptableResponseTimeVariability.Seconds() && len(ch.sem) > MinConcurrency {
114+
if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) > MinConcurrency {
48115
newSize := len(ch.sem) - 1
49-
ch.logger.Info("Reducing concurrency due to high response time variability", zap.Int("NewSize", newSize))
116+
ch.logger.Info("Reducing request concurrency due to high response time variability", zap.Int("NewSize", newSize))
117+
ch.ResizeSemaphore(newSize)
118+
} else if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < MaxConcurrency {
119+
// Scale up if response time variability is below the threshold and concurrency is below the maximum limit
120+
newSize := len(ch.sem) + 1
121+
ch.logger.Info("Increasing request concurrency due to low response time variability", zap.Int("NewSize", newSize))
50122
ch.ResizeSemaphore(newSize)
51123
}
52124
}
@@ -58,8 +130,8 @@ func (ch *ConcurrencyHandler) calculateVariance(averageResponseTime time.Duratio
58130
responseSeconds := responseTime.Seconds()
59131

60132
// Calculate variance
61-
variance := (float64(ch.Metrics.ResponseCount-1)*math.Pow(averageSeconds-responseSeconds, 2) + ch.Metrics.Variance) / float64(ch.Metrics.ResponseCount)
62-
ch.Metrics.Variance = variance
133+
variance := (float64(ch.Metrics.ResponseTimeVariability.Count-1)*math.Pow(averageSeconds-responseSeconds, 2) + ch.Metrics.ResponseTimeVariability.Variance) / float64(ch.Metrics.ResponseTimeVariability.Count)
134+
ch.Metrics.ResponseTimeVariability.Variance = variance
63135
return variance
64136
}
65137

@@ -86,11 +158,11 @@ func (ch *ConcurrencyHandler) MonitorNetworkLatency(ttfb time.Duration, throughp
86158
// Adjust concurrency based on TTFB and throughput moving averages
87159
if ttfbMovingAverage > MaxAcceptableTTFB && len(ch.sem) > MinConcurrency {
88160
newSize := len(ch.sem) - 1
89-
ch.logger.Info("Reducing concurrency due to high TTFB", zap.Int("NewSize", newSize))
161+
ch.logger.Info("Reducing request concurrency due to high TTFB", zap.Int("NewSize", newSize))
90162
ch.ResizeSemaphore(newSize)
91163
} else if throughputMovingAverage > MaxAcceptableThroughput && len(ch.sem) < MaxConcurrency {
92164
newSize := len(ch.sem) + 1
93-
ch.logger.Info("Increasing concurrency due to high throughput", zap.Int("NewSize", newSize))
165+
ch.logger.Info("Increasing request concurrency due to high throughput", zap.Int("NewSize", newSize))
94166
ch.ResizeSemaphore(newSize)
95167
}
96168
}

concurrency/resize.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// concurrency/resize.go
2+
3+
package concurrency
4+
5+
// ResizeSemaphore adjusts the size of the semaphore used to control concurrency. This method creates a new
6+
// semaphore with the specified new size and closes the old semaphore to ensure that no further tokens can
7+
// be acquired from it. This approach helps manage the transition from the old concurrency level to the new one
8+
// without affecting ongoing operations significantly.
9+
//
10+
// Parameters:
11+
// - newSize: The new size for the semaphore, representing the updated limit on concurrent requests.
12+
//
13+
// This function should be called from within synchronization contexts, such as AdjustConcurrency, to avoid
14+
// race conditions and ensure that changes to the semaphore are consistent with the observed metrics.
15+
func (ch *ConcurrencyHandler) ResizeSemaphore(newSize int) {
16+
newSem := make(chan struct{}, newSize)
17+
18+
// Transfer tokens from the old semaphore to the new one.
19+
for {
20+
select {
21+
case token := <-ch.sem:
22+
select {
23+
case newSem <- token:
24+
// Token transferred to new semaphore.
25+
default:
26+
// New semaphore is full, put token back to the old one to allow ongoing operations to complete.
27+
ch.sem <- token
28+
}
29+
default:
30+
// No more tokens to transfer.
31+
close(ch.sem)
32+
ch.sem = newSem
33+
return
34+
}
35+
}
36+
}

0 commit comments

Comments
 (0)