Skip to content

Commit 01ac595

Browse files
authored
Merge pull request #126 from deploymenttheory/dev
Refactor
2 parents 4732100 + 0c066b7 commit 01ac595

13 files changed

+498
-70
lines changed
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
// concurrencyhandler/dynamic_token_adjustment.go
2+
3+
package concurrencyhandler
4+
5+
import (
6+
"time"
7+
8+
"go.uber.org/zap"
9+
)
10+
11+
// AdjustConcurrencyLimit dynamically modifies the maximum concurrency limit
12+
// based on the newLimit provided. This function helps in adjusting the concurrency
13+
// limit in real-time based on observed system performance and other metrics. It
14+
// transfers the tokens from the old semaphore to the new one, ensuring that there's
15+
// no loss of tokens during the transition.
16+
func (ch *ConcurrencyHandler) AdjustConcurrencyLimit(newLimit int) {
17+
ch.lock.Lock()
18+
defer ch.lock.Unlock()
19+
20+
if newLimit <= 0 {
21+
return // Avoid setting a non-positive limit
22+
}
23+
24+
// Create a new semaphore with the desired limit
25+
newSem := make(chan struct{}, newLimit)
26+
27+
// Transfer tokens from the old semaphore to the new one
28+
for i := 0; i < len(ch.sem) && i < newLimit; i++ {
29+
newSem <- struct{}{}
30+
}
31+
32+
ch.sem = newSem
33+
}
34+
35+
// AdjustConcurrencyBasedOnMetrics evaluates the current metrics and adjusts the
36+
// concurrency limit if required. It checks metrics like average token acquisition
37+
// time and decides on a new concurrency limit. The method ensures that the new
38+
// limit respects the minimum and maximum allowed concurrency bounds.
39+
func (ch *ConcurrencyHandler) AdjustConcurrencyBasedOnMetrics() {
40+
// Calculate the average acquisition time
41+
avgAcquisitionTime := ch.AverageAcquisitionTime()
42+
43+
// Get the current concurrency limit
44+
currentLimit := cap(ch.sem)
45+
46+
// Calculate the historical average acquisition time
47+
historicalAvgAcquisitionTime := ch.HistoricalAverageAcquisitionTime()
48+
49+
// Decide on a new limit based on metrics
50+
newLimit := currentLimit
51+
if avgAcquisitionTime > time.Duration(float64(historicalAvgAcquisitionTime)*1.2) { // 20% increase in acquisition time
52+
newLimit = currentLimit - 2 // decrease concurrency more aggressively
53+
} else if avgAcquisitionTime < time.Duration(float64(historicalAvgAcquisitionTime)*0.8) { // 20% decrease in acquisition time
54+
newLimit = currentLimit + 2 // increase concurrency more aggressively
55+
} else if avgAcquisitionTime > historicalAvgAcquisitionTime {
56+
newLimit = currentLimit - 1 // decrease concurrency conservatively
57+
} else if avgAcquisitionTime < historicalAvgAcquisitionTime {
58+
newLimit = currentLimit + 1 // increase concurrency conservatively
59+
}
60+
61+
// Ensure newLimit is within safety bounds
62+
if newLimit > MaxConcurrency {
63+
newLimit = MaxConcurrency
64+
} else if newLimit < MinConcurrency {
65+
newLimit = MinConcurrency
66+
}
67+
68+
// Adjust concurrency if the new limit is different from the current
69+
if newLimit != currentLimit {
70+
ch.AdjustConcurrencyLimit(newLimit)
71+
72+
// Log the adjustment
73+
ch.logger.Debug("Adjusted concurrency",
74+
zap.Int("OldLimit", currentLimit),
75+
zap.Int("NewLimit", newLimit),
76+
zap.String("Reason", "Based on average acquisition time"),
77+
zap.Duration("AverageAcquisitionTime", avgAcquisitionTime),
78+
zap.Duration("HistoricalAverageAcquisitionTime", historicalAvgAcquisitionTime),
79+
)
80+
}
81+
}
82+
83+
// EvaluateMetricsAndAdjustConcurrency evaluates the performance metrics and makes necessary
84+
// adjustments to the concurrency limit. The method assesses the average response time
85+
// and adjusts the concurrency based on how it compares to the historical average acquisition time.
86+
// If the average response time has significantly increased compared to the historical average,
87+
// the concurrency limit is decreased, and vice versa. The method ensures that the concurrency
88+
// limit remains within the bounds defined by the system's best practices.
89+
func (ch *ConcurrencyHandler) EvaluateMetricsAndAdjustConcurrency() {
90+
ch.PerfMetrics.lock.Lock()
91+
averageResponseTime := ch.PerfMetrics.TotalResponseTime / time.Duration(ch.PerfMetrics.TotalRequests)
92+
ch.PerfMetrics.lock.Unlock()
93+
94+
historicalAverageAcquisitionTime := ch.HistoricalAverageAcquisitionTime()
95+
96+
// Decide on the new limit based on the average response time compared to the historical average
97+
newLimit := cap(ch.sem) // Start with the current limit
98+
if averageResponseTime > time.Duration(float64(historicalAverageAcquisitionTime)*1.2) {
99+
// Decrease concurrency more aggressively if the average response time has significantly increased
100+
newLimit -= 1
101+
} else if averageResponseTime < time.Duration(float64(historicalAverageAcquisitionTime)*0.8) {
102+
// Increase concurrency more aggressively if the average response time has significantly decreased
103+
newLimit += 1
104+
}
105+
106+
// Ensure the new limit is within the defined bounds
107+
if newLimit > MaxConcurrency {
108+
newLimit = MaxConcurrency
109+
} else if newLimit < MinConcurrency {
110+
newLimit = MinConcurrency
111+
}
112+
113+
// Adjust the concurrency limit if the new limit is different from the current limit
114+
currentLimit := cap(ch.sem)
115+
if newLimit != currentLimit {
116+
ch.AdjustConcurrencyLimit(newLimit)
117+
118+
// Log the adjustment for debugging purposes
119+
ch.logger.Debug("Adjusted concurrency",
120+
zap.Int("OldLimit", currentLimit),
121+
zap.Int("NewLimit", newLimit),
122+
zap.String("Reason", "Based on evaluation of metrics"),
123+
zap.Duration("AverageResponseTime", averageResponseTime),
124+
zap.Duration("HistoricalAverageAcquisitionTime", historicalAverageAcquisitionTime),
125+
)
126+
}
127+
}
128+
129+
//------ Concurrency Monitoring Functions:
130+
131+
// StartMetricEvaluation continuously monitors the client's interactions with the API and adjusts the concurrency limits dynamically.
132+
// The function evaluates metrics at regular intervals to detect burst activity patterns.
133+
// If a burst activity is detected (e.g., many requests in a short period), the evaluation interval is reduced for more frequent checks.
134+
// Otherwise, it reverts to a default interval for regular checks.
135+
// After each evaluation, the function calls EvaluateMetricsAndAdjustConcurrency to potentially adjust the concurrency based on observed metrics.
136+
//
137+
// The evaluation process works as follows:
138+
// 1. Sleep for the defined evaluation interval.
139+
// 2. Check if there's a burst in activity using the isBurstActivity method.
140+
// 3. If a burst is detected, the evaluation interval is shortened to more frequently monitor and adjust the concurrency.
141+
// 4. If no burst is detected, it maintains the default evaluation interval.
142+
// 5. It then evaluates the metrics and adjusts the concurrency accordingly.
143+
func (ch *ConcurrencyHandler) StartMetricEvaluation() {
144+
evalInterval := 5 * time.Minute // Initial interval
145+
146+
for {
147+
time.Sleep(evalInterval) // Wait for the defined evaluation interval
148+
149+
// Determine if there's been a burst in activity
150+
if ch.isBurstActivity() {
151+
evalInterval = 1 * time.Minute // Increase the frequency of checks during burst activity
152+
} else {
153+
evalInterval = 5 * time.Minute // Revert to the default interval outside of burst periods
154+
}
155+
156+
// Evaluate the current metrics and adjust concurrency if necessary
157+
ch.EvaluateMetricsAndAdjustConcurrency()
158+
}
159+
}
160+
161+
// isBurstActivity checks if there's been a burst in activity based on the acquisition times of the tokens.
162+
// A burst is considered to have occurred if the time since the last token acquisition is short.
163+
func (ch *ConcurrencyHandler) isBurstActivity() bool {
164+
// Lock before checking the last token acquisition time
165+
ch.lock.Lock()
166+
defer ch.lock.Unlock()
167+
168+
// Consider it a burst if the last token was acquired less than 2 minutes ago
169+
return time.Since(ch.lastTokenAcquisitionTime) < 2*time.Minute
170+
}
171+
172+
// StartConcurrencyAdjustment launches a periodic checker that evaluates current metrics and adjusts concurrency limits if needed.
173+
// It uses a ticker to periodically trigger the adjustment logich.
174+
func (ch *ConcurrencyHandler) StartConcurrencyAdjustment() {
175+
ticker := time.NewTicker(EvaluationInterval)
176+
defer ticker.Stop()
177+
178+
for range ticker.C {
179+
ch.AdjustConcurrencyBasedOnMetrics()
180+
}
181+
}
182+
183+
// Returns the average Acquisition Time to get a token from the semaphore
184+
func (ch *ConcurrencyHandler) GetAverageAcquisitionTime() time.Duration {
185+
// Assuming ConcurrencyMgr has a method to get this metric
186+
return ch.AverageAcquisitionTime()
187+
}
188+
189+
func (ch *ConcurrencyHandler) GetHistoricalAverageAcquisitionTime() time.Duration {
190+
// Assuming ConcurrencyMgr has a method to get this metric
191+
return ch.HistoricalAverageAcquisitionTime()
192+
}
193+
194+
// GetPerformanceMetrics returns the current performance metrics of the ConcurrencyHandler.
195+
// This includes counts of total requests, retries, rate limit errors, total response time,
196+
// and token wait time.
197+
func (ch *ConcurrencyHandler) GetPerformanceMetrics() *PerformanceMetrics {
198+
return ch.PerfMetrics
199+
}

concurrenyhandler/handler.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// concurrencyhandler/handler.go
2+
package concurrencyhandler
3+
4+
import (
5+
"sync"
6+
"time"
7+
8+
"github.com/deploymenttheory/go-api-http-client/logger"
9+
)
10+
11+
// Constants and Data Structures:
12+
const (
13+
MaxConcurrency = 10 // Maximum allowed concurrent requests
14+
MinConcurrency = 1 // Minimum allowed concurrent requests
15+
EvaluationInterval = 1 * time.Minute // Time interval for evaluating metrics and adjusting concurrency
16+
)
17+
18+
// ConcurrencyHandler controls the number of concurrent HTTP requests.
19+
type ConcurrencyHandler struct {
20+
sem chan struct{}
21+
logger logger.Logger
22+
AcquisitionTimes []time.Duration
23+
lock sync.Mutex
24+
lastTokenAcquisitionTime time.Time
25+
PerfMetrics *PerformanceMetrics
26+
}
27+
28+
// PerformanceMetrics captures various metrics related to the client's
29+
// interactions with the API.
30+
type PerformanceMetrics struct {
31+
TotalRequests int64
32+
TotalRetries int64
33+
TotalRateLimitErrors int64
34+
TotalResponseTime time.Duration
35+
TokenWaitTime time.Duration
36+
lock sync.Mutex // Protects performance metrics fields
37+
}
38+
39+
// NewConcurrencyManager initializes a new ConcurrencyManager with the given
40+
// concurrency limit, logger, and perf metrics. The ConcurrencyManager ensures
41+
// no more than a certain number of concurrent requests are made.
42+
// It uses a semaphore to control concurrency.
43+
func NewConcurrencyHandler(limit int, logger logger.Logger, perfMetrics *PerformanceMetrics) *ConcurrencyHandler {
44+
return &ConcurrencyHandler{
45+
sem: make(chan struct{}, limit),
46+
logger: logger,
47+
AcquisitionTimes: []time.Duration{},
48+
PerfMetrics: perfMetrics,
49+
}
50+
}
51+
52+
// requestIDKey is type used as a key for storing and retrieving
53+
// request-specific identifiers from a context.Context object. This private
54+
// type ensures that the key is distinct and prevents accidental value
55+
// retrieval or conflicts with other context keys. The value associated
56+
// with this key in a context is typically a UUID that uniquely identifies
57+
// a request being processed by the ConcurrencyManager, allowing for
58+
// fine-grained control and tracking of concurrent HTTP requests.
59+
type requestIDKey struct{}

concurrenyhandler/metrics.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// concurrencyhandler/metrics.go
2+
package concurrencyhandler
3+
4+
import "time"
5+
6+
// AverageAcquisitionTime computes the average time taken to acquire a token
7+
// from the semaphore. It helps in understanding the contention for tokens
8+
// and can be used to adjust concurrency limits.
9+
func (ch *ConcurrencyHandler) AverageAcquisitionTime() time.Duration {
10+
ch.lock.Lock()
11+
defer ch.lock.Unlock()
12+
13+
if len(ch.AcquisitionTimes) == 0 {
14+
return 0
15+
}
16+
17+
totalTime := time.Duration(0)
18+
for _, t := range ch.AcquisitionTimes {
19+
totalTime += t
20+
}
21+
return totalTime / time.Duration(len(ch.AcquisitionTimes))
22+
}
23+
24+
// HistoricalAverageAcquisitionTime computes the average time taken to acquire
25+
// a token from the semaphore over a historical period (e.g., the last 5 minutes).
26+
// It helps in understanding the historical contention for tokens and can be used
27+
// to adjust concurrency limits.
28+
func (ch *ConcurrencyHandler) HistoricalAverageAcquisitionTime() time.Duration {
29+
ch.lock.Lock()
30+
defer ch.lock.Unlock()
31+
32+
// For simplicity, let's say we store the last 5 minutes of acquisition times.
33+
// This means if EvaluationInterval is 1 minute, we consider the last 5 data points.
34+
historicalCount := 5
35+
if len(ch.AcquisitionTimes) < historicalCount {
36+
return ch.AverageAcquisitionTime() // If not enough historical data, return the overall average
37+
}
38+
39+
totalTime := time.Duration(0)
40+
for _, t := range ch.AcquisitionTimes[len(ch.AcquisitionTimes)-historicalCount:] {
41+
totalTime += t
42+
}
43+
return totalTime / time.Duration(historicalCount)
44+
}
45+
46+
// updatePerformanceMetrics updates the ConcurrencyHandler's performance metrics
47+
// by recording the duration of an HTTP request and incrementing the total
48+
// request count. This function is thread-safe and uses a mutex to synchronize
49+
// updates to the performance metrics.
50+
//
51+
// Parameters:
52+
// - duration: The time duration it took for an HTTP request to complete.
53+
//
54+
// This function should be called after each HTTP request to keep track of the
55+
// ConcurrencyHandler's performance over time.
56+
func (ch *ConcurrencyHandler) UpdatePerformanceMetrics(duration time.Duration) {
57+
ch.PerfMetrics.lock.Lock()
58+
defer ch.PerfMetrics.lock.Unlock()
59+
ch.PerfMetrics.TotalResponseTime += duration
60+
ch.PerfMetrics.TotalRequests++
61+
}
62+
63+
// Min returns the smaller of the two integers.
64+
func Min(a, b int) int {
65+
if a < b {
66+
return a
67+
}
68+
return b
69+
}

0 commit comments

Comments
 (0)