Skip to content

Commit e20d59b

Browse files
authored
Merge pull request #171 from deploymenttheory/dev
Refactor concurrency package to include new metrics and adjust concur…
2 parents 3c81bed + ec2d7ec commit e20d59b

File tree

10 files changed

+198
-404
lines changed

10 files changed

+198
-404
lines changed

concurrency/const.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,6 @@ package concurrency
44
import "time"
55

66
const (
7-
// MaxConcurrency defines the upper limit of concurrent requests the system can handle.
8-
MaxConcurrency = 10
9-
10-
// MinConcurrency defines the lower limit of concurrent requests the system will maintain.
11-
MinConcurrency = 1
12-
13-
// EvaluationInterval specifies the frequency at which the system evaluates its performance metrics
14-
// to decide if concurrency adjustments are needed.
15-
EvaluationInterval = 1 * time.Minute
16-
177
// MaxAcceptableTTFB represents the maximum acceptable Time to First Byte (TTFB) in milliseconds.
188
// TTFB is the time taken for the server to start sending the first byte of data in response to a request.
199
// Adjustments in concurrency will be made if the TTFB exceeds this threshold.
@@ -32,13 +22,4 @@ const (
3222
// Error rate is calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests.
3323
// Adjustments in concurrency will be made if the error rate exceeds this threshold. A threshold of 0.1 (or 10%) is common.
3424
ErrorRateThreshold = 0.1
35-
36-
// Weight assigned to each metric feedback type
37-
WeightRateLimit = 0.5 // Weight for rate limit feedback, less if not all APIs provide this data
38-
WeightResponseCodes = 1.0 // Weight for server response codes
39-
WeightResponseTime = 1.5 // Higher weight for response time variability
40-
41-
// Thresholds for semaphore scaling actions
42-
ThresholdScaleDown = -1.5 // Threshold to decide scaling down
43-
ThresholdScaleUp = 1.5 // Threshold to decide scaling up
4425
)

concurrency/handler.go

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,23 @@ import (
66
"time"
77

88
"github.com/deploymenttheory/go-api-http-client/logger"
9-
"golang.org/x/sync/semaphore"
109
)
1110

12-
// ConcurrencyHandler controls the number of concurrent HTTP requests.
13-
// type ConcurrencyHandler struct {
14-
// sem chan struct{}
15-
// logger logger.Logger
16-
// AcquisitionTimes []time.Duration
17-
// lock sync.Mutex
18-
// lastTokenAcquisitionTime time.Time
19-
// Metrics *ConcurrencyMetrics
20-
// }
11+
// Constants and Data Structures:
12+
const (
13+
MaxConcurrency = 10 // Maximum allowed concurrent requests
14+
MinConcurrency = 1 // Minimum allowed concurrent requests
15+
EvaluationInterval = 1 * time.Minute // Time interval for evaluating metrics and adjusting concurrency
16+
)
2117

18+
// ConcurrencyHandler controls the number of concurrent HTTP requests.
2219
type ConcurrencyHandler struct {
23-
sem *semaphore.Weighted
24-
lock sync.RWMutex
25-
logger logger.Logger
26-
Metrics *ConcurrencyMetrics
27-
currentCapacity int64
28-
activePermits int64
20+
sem chan struct{}
21+
logger logger.Logger
22+
AcquisitionTimes []time.Duration
23+
lock sync.Mutex
24+
lastTokenAcquisitionTime time.Time
25+
Metrics *ConcurrencyMetrics
2926
}
3027

3128
// ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.
@@ -63,22 +60,12 @@ type ConcurrencyMetrics struct {
6360
// concurrency limit, logger, and concurrency metrics. The ConcurrencyHandler ensures
6461
// no more than a certain number of concurrent requests are made.
6562
// It uses a semaphore to control concurrency.
66-
//
67-
// func NewConcurrencyHandler(limit int, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler {
68-
// return &ConcurrencyHandler{
69-
// sem: make(chan struct{}, limit),
70-
// logger: logger,
71-
// AcquisitionTimes: []time.Duration{},
72-
// Metrics: metrics,
73-
// }
74-
// }
75-
func NewConcurrencyHandler(limit int64, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler {
63+
func NewConcurrencyHandler(limit int, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler {
7664
return &ConcurrencyHandler{
77-
sem: semaphore.NewWeighted(limit),
78-
logger: logger,
79-
Metrics: metrics,
80-
currentCapacity: limit,
81-
activePermits: 0,
65+
sem: make(chan struct{}, limit),
66+
logger: logger,
67+
AcquisitionTimes: []time.Duration{},
68+
Metrics: metrics,
8269
}
8370
}
8471

concurrency/metrics.go

Lines changed: 87 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -27,57 +27,49 @@ import (
2727
//
2828
// It logs the specific reason for scaling decisions, helping in traceability and fine-tuning system performance.
2929
func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration) {
30+
// Call monitoring functions
3031
rateLimitFeedback := ch.MonitorRateLimitHeaders(resp)
3132
responseCodeFeedback := ch.MonitorServerResponseCodes(resp)
3233
responseTimeFeedback := ch.MonitorResponseTimeVariability(responseTime)
3334

34-
// Compute the weighted feedback
35-
weightedFeedback := float64(rateLimitFeedback)*WeightRateLimit +
36-
float64(responseCodeFeedback)*WeightResponseCodes +
37-
float64(responseTimeFeedback)*WeightResponseTime
38-
39-
// Log the feedback and weighted result for debugging
35+
// Log the feedback from each monitoring function for debugging
4036
ch.logger.Debug("Concurrency Adjustment Feedback",
41-
zap.Float64("WeightedFeedback", weightedFeedback))
37+
zap.Int("RateLimitFeedback", rateLimitFeedback),
38+
zap.Int("ResponseCodeFeedback", responseCodeFeedback),
39+
zap.Int("ResponseTimeFeedback", responseTimeFeedback))
40+
41+
// Determine overall action based on feedback
42+
suggestions := []int{rateLimitFeedback, responseCodeFeedback, responseTimeFeedback}
43+
scaleDownCount := 0
44+
scaleUpCount := 0
45+
46+
for _, suggestion := range suggestions {
47+
switch suggestion {
48+
case -1:
49+
scaleDownCount++
50+
case 1:
51+
scaleUpCount++
52+
}
53+
}
4254

43-
// Apply thresholds to determine scaling action
44-
if weightedFeedback <= ThresholdScaleDown {
45-
ch.logger.Info("Scaling down the concurrency", zap.Float64("WeightedFeedback", weightedFeedback))
55+
// Log the counts for scale down and up suggestions
56+
ch.logger.Info("Scaling Decision Counts",
57+
zap.Int("ScaleDownCount", scaleDownCount),
58+
zap.Int("ScaleUpCount", scaleUpCount))
59+
60+
// Decide on scaling action
61+
if scaleDownCount > scaleUpCount {
62+
ch.logger.Info("Scaling down the concurrency", zap.String("Reason", "More signals suggested to decrease concurrency"))
4663
ch.ScaleDown()
47-
} else if weightedFeedback >= ThresholdScaleUp {
48-
ch.logger.Info("Scaling up the concurrency", zap.Float64("WeightedFeedback", weightedFeedback))
64+
} else if scaleUpCount > scaleDownCount {
65+
ch.logger.Info("Scaling up the concurrency", zap.String("Reason", "More signals suggested to increase concurrency"))
4966
ch.ScaleUp()
5067
} else {
51-
ch.logger.Info("Maintaining current concurrency level", zap.Float64("WeightedFeedback", weightedFeedback))
68+
ch.logger.Info("No change in concurrency", zap.String("Reason", "Equal signals for both scaling up and down"))
5269
}
5370
}
5471

5572
// MonitorRateLimitHeaders monitors the rate limit headers in the response and suggests a concurrency adjustment.
56-
// func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int {
57-
// remaining := resp.Header.Get("X-RateLimit-Remaining")
58-
// retryAfter := resp.Header.Get("Retry-After")
59-
// suggestion := 0
60-
61-
// if remaining != "" {
62-
// remainingValue, err := strconv.Atoi(remaining)
63-
// if err == nil && remainingValue < 10 {
64-
// // Suggest decrease concurrency if X-RateLimit-Remaining is below the threshold
65-
// suggestion = -1
66-
// }
67-
// }
68-
69-
// if retryAfter != "" {
70-
// // Suggest decrease concurrency if Retry-After is specified
71-
// suggestion = -1
72-
// } else {
73-
// // Suggest increase concurrency if currently below maximum limit and no other decrease suggestion has been made
74-
// if len(ch.sem) < MaxConcurrency && suggestion == 0 {
75-
// suggestion = 1
76-
// }
77-
// }
78-
79-
// return suggestion
80-
// }
8173
func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int {
8274
remaining := resp.Header.Get("X-RateLimit-Remaining")
8375
retryAfter := resp.Header.Get("Retry-After")
@@ -86,128 +78,99 @@ func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int {
8678
if remaining != "" {
8779
remainingValue, err := strconv.Atoi(remaining)
8880
if err == nil && remainingValue < 10 {
81+
// Suggest decrease concurrency if X-RateLimit-Remaining is below the threshold
8982
suggestion = -1
9083
}
9184
}
9285

9386
if retryAfter != "" {
87+
// Suggest decrease concurrency if Retry-After is specified
9488
suggestion = -1
89+
} else {
90+
// Suggest increase concurrency if currently below maximum limit and no other decrease suggestion has been made
91+
if len(ch.sem) < MaxConcurrency && suggestion == 0 {
92+
suggestion = 1
93+
}
9594
}
9695

9796
return suggestion
9897
}
9998

10099
// MonitorServerResponseCodes monitors the response status codes and suggests a concurrency adjustment.
101-
// func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int {
102-
// statusCode := resp.StatusCode
103-
104-
// // Lock the metrics to ensure thread safety
105-
// ch.Metrics.Lock.Lock()
106-
// defer ch.Metrics.Lock.Unlock()
107-
108-
// // Update the appropriate error count based on the response status code
109-
// switch {
110-
// case statusCode >= 500 && statusCode < 600:
111-
// ch.Metrics.TotalRateLimitErrors++
112-
// case statusCode >= 400 && statusCode < 500:
113-
// // Assuming 4xx errors as client errors
114-
// ch.Metrics.TotalRetries++
115-
// }
116-
117-
// // Calculate error rate
118-
// totalRequests := float64(ch.Metrics.TotalRequests)
119-
// totalErrors := float64(ch.Metrics.TotalRateLimitErrors + ch.Metrics.TotalRetries)
120-
// errorRate := totalErrors / totalRequests
121-
122-
// // Set the new error rate in the metrics
123-
// ch.Metrics.ResponseCodeMetrics.ErrorRate = errorRate
124-
125-
// // Determine action based on the error rate
126-
// if errorRate > ErrorRateThreshold {
127-
// // Suggest decrease concurrency
128-
// return -1
129-
// } else if errorRate <= ErrorRateThreshold && len(ch.sem) < MaxConcurrency {
130-
// // Suggest increase concurrency if there is capacity
131-
// return 1
132-
// }
133-
// return 0
134-
// }
135100
func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int {
136101
statusCode := resp.StatusCode
102+
103+
// Lock the metrics to ensure thread safety
137104
ch.Metrics.Lock.Lock()
138105
defer ch.Metrics.Lock.Unlock()
139106

140-
if statusCode >= 500 {
107+
// Update the appropriate error count based on the response status code
108+
switch {
109+
case statusCode >= 500 && statusCode < 600:
141110
ch.Metrics.TotalRateLimitErrors++
142-
return -1
143-
} else if statusCode >= 400 {
111+
case statusCode >= 400 && statusCode < 500:
112+
// Assuming 4xx errors as client errors
144113
ch.Metrics.TotalRetries++
145-
return -1
146114
}
147115

116+
// Calculate error rate
117+
totalRequests := float64(ch.Metrics.TotalRequests)
118+
totalErrors := float64(ch.Metrics.TotalRateLimitErrors + ch.Metrics.TotalRetries)
119+
errorRate := totalErrors / totalRequests
120+
121+
// Set the new error rate in the metrics
122+
ch.Metrics.ResponseCodeMetrics.ErrorRate = errorRate
123+
124+
// Determine action based on the error rate
125+
if errorRate > ErrorRateThreshold {
126+
// Suggest decrease concurrency
127+
return -1
128+
} else if errorRate <= ErrorRateThreshold && len(ch.sem) < MaxConcurrency {
129+
// Suggest increase concurrency if there is capacity
130+
return 1
131+
}
148132
return 0
149133
}
150134

151135
// MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment.
152-
// func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int {
153-
// ch.Metrics.Lock.Lock()
154-
// defer ch.Metrics.Lock.Unlock()
155-
156-
// // Update ResponseTimeVariability metrics
157-
// ch.Metrics.ResponseTimeVariability.Lock.Lock()
158-
// defer ch.Metrics.ResponseTimeVariability.Lock.Unlock()
159-
// ch.Metrics.ResponseTimeVariability.Total += responseTime
160-
// ch.Metrics.ResponseTimeVariability.Count++
161-
162-
// // Calculate average response time
163-
// ch.Metrics.ResponseTimeVariability.Average = ch.Metrics.ResponseTimeVariability.Total / time.Duration(ch.Metrics.ResponseTimeVariability.Count)
164-
165-
// // Calculate variance of response times
166-
// ch.Metrics.ResponseTimeVariability.Variance = ch.calculateVariance(ch.Metrics.ResponseTimeVariability.Average, responseTime)
167-
168-
// // Calculate standard deviation of response times
169-
// stdDev := math.Sqrt(ch.Metrics.ResponseTimeVariability.Variance)
170-
171-
// // Determine action based on standard deviation
172-
// if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold {
173-
// // Suggest decrease concurrency
174-
// return -1
175-
// } else if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < MaxConcurrency {
176-
// // Suggest increase concurrency if there is capacity
177-
// return 1
178-
// }
179-
// return 0
180-
// }
181136
func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int {
182137
ch.Metrics.Lock.Lock()
183138
defer ch.Metrics.Lock.Unlock()
184139

185-
// Update total response time and count
140+
// Update ResponseTimeVariability metrics
141+
ch.Metrics.ResponseTimeVariability.Lock.Lock()
142+
defer ch.Metrics.ResponseTimeVariability.Lock.Unlock()
186143
ch.Metrics.ResponseTimeVariability.Total += responseTime
187144
ch.Metrics.ResponseTimeVariability.Count++
188145

189-
// Calculate the average response time
190-
averageResponseTime := ch.Metrics.ResponseTimeVariability.Total / time.Duration(ch.Metrics.ResponseTimeVariability.Count)
146+
// Calculate average response time
147+
ch.Metrics.ResponseTimeVariability.Average = ch.Metrics.ResponseTimeVariability.Total / time.Duration(ch.Metrics.ResponseTimeVariability.Count)
191148

192-
// Calculate variance
193-
variance := ch.calculateVariance(averageResponseTime, responseTime)
194-
// Calculate standard deviation
195-
stdDev := math.Sqrt(variance)
149+
// Calculate variance of response times
150+
ch.Metrics.ResponseTimeVariability.Variance = ch.calculateVariance(ch.Metrics.ResponseTimeVariability.Average, responseTime)
196151

197-
// Convert MaxAcceptableResponseTimeVariability to seconds for comparison
198-
maxStdDev := MaxAcceptableResponseTimeVariability.Seconds()
152+
// Calculate standard deviation of response times
153+
stdDev := math.Sqrt(ch.Metrics.ResponseTimeVariability.Variance)
199154

200-
if stdDev > maxStdDev {
201-
return -1 // Suggest to decrease concurrency if stdDev exceeds the maximum threshold
155+
// Determine action based on standard deviation
156+
if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold {
157+
// Suggest decrease concurrency
158+
return -1
159+
} else if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < MaxConcurrency {
160+
// Suggest increase concurrency if there is capacity
161+
return 1
202162
}
203-
return 1 // Suggest to increase concurrency if stdDev is within the acceptable range
163+
return 0
204164
}
205165

206-
// calculateVariance calculates the variance between the average response time and a new sample.
207-
func (ch *ConcurrencyHandler) calculateVariance(average, newSample time.Duration) float64 {
208-
mean := average.Seconds() // Convert to seconds
209-
newValue := newSample.Seconds() // Convert to seconds
210-
newVariance := (float64(ch.Metrics.ResponseTimeVariability.Count-1)*math.Pow(mean-newValue, 2) + ch.Metrics.ResponseTimeVariability.Variance) / float64(ch.Metrics.ResponseTimeVariability.Count)
211-
ch.Metrics.ResponseTimeVariability.Variance = newVariance // Update the variance in metrics
212-
return newVariance
166+
// calculateVariance calculates the variance of response times.
167+
func (ch *ConcurrencyHandler) calculateVariance(averageResponseTime time.Duration, responseTime time.Duration) float64 {
168+
// Convert time.Duration values to seconds
169+
averageSeconds := averageResponseTime.Seconds()
170+
responseSeconds := responseTime.Seconds()
171+
172+
// Calculate variance
173+
variance := (float64(ch.Metrics.ResponseTimeVariability.Count-1)*math.Pow(averageSeconds-responseSeconds, 2) + ch.Metrics.ResponseTimeVariability.Variance) / float64(ch.Metrics.ResponseTimeVariability.Count)
174+
ch.Metrics.ResponseTimeVariability.Variance = variance
175+
return variance
213176
}

0 commit comments

Comments
 (0)