Skip to content

Commit 70b0532

Browse files
committed
improve config_changes lookup for existing ones
1 parent 64051fe commit 70b0532

File tree

3 files changed

+68
-43
lines changed

3 files changed

+68
-43
lines changed

api/cache.go

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ type TempCache struct {
1616
ctx context.Context
1717
items map[string]models.ConfigItem
1818
aliases map[string]string
19-
20-
changes map[string]struct{}
2119
}
2220

2321
func (t TempCache) FindExternal(ext v1.ExternalID) (*models.ConfigItem, error) {
@@ -78,38 +76,6 @@ func (t TempCache) Insert(item models.ConfigItem) {
7876
t.items[strings.ToLower(item.ID)] = item
7977
}
8078

81-
func (t TempCache) IsChangePersisted(configID, externalChangeID string) (bool, error) {
82-
if configID == "" || externalChangeID == "" {
83-
return false, nil
84-
}
85-
86-
configID = strings.ToLower(configID)
87-
externalChangeID = strings.ToLower(externalChangeID)
88-
89-
if t.changes == nil {
90-
t.changes = make(map[string]struct{})
91-
}
92-
93-
if _, ok := t.changes[configID+externalChangeID]; ok {
94-
return true, nil
95-
}
96-
97-
var result models.ConfigChange
98-
if err := t.ctx.DB().Select("id").Where("config_id = ?", configID).
99-
Where("external_change_id = ?", externalChangeID).
100-
Limit(1).
101-
Find(&result).Error; err != nil {
102-
return false, err
103-
}
104-
105-
if result.ID != "" {
106-
t.changes[configID+externalChangeID] = struct{}{}
107-
return true, nil
108-
}
109-
110-
return false, nil
111-
}
112-
11379
func (t TempCache) Get(id string) (*models.ConfigItem, error) {
11480
id = strings.ToLower(id)
11581
if id == "" {

db/changes.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"time"
66

77
sw "github.com/RussellLuo/slidingwindow"
8+
"github.com/patrickmn/go-cache"
9+
"github.com/samber/lo"
810

911
"github.com/flanksource/commons/collections"
1012
"github.com/flanksource/config-db/api"
@@ -19,6 +21,8 @@ const (
1921
ChangeTypeTooManyChanges = "TooManyChanges"
2022
)
2123

24+
var configChangesCache = cache.New(time.Hour*24, time.Hour*24)
25+
2226
func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error) {
2327
var count int64
2428
err := ctx.DB().Table("config_changes").
@@ -122,6 +126,7 @@ func syncCurrentlyRateLimitedConfigs(ctx api.ScrapeContext, window time.Duration
122126
if err != nil {
123127
return nil, err
124128
}
129+
defer rows.Close()
125130

126131
output := make(map[string]struct{})
127132
for rows.Next() {
@@ -156,6 +161,7 @@ func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error {
156161
if err != nil {
157162
return err
158163
}
164+
defer rows.Close()
159165

160166
for rows.Next() {
161167
var configID string
@@ -179,3 +185,54 @@ func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error {
179185

180186
return rows.Err()
181187
}
188+
189+
// filterOutPersistedChanges returns only those changes that weren't seen in the db.
190+
func filterOutPersistedChanges(ctx api.ScrapeContext, changes []*models.ConfigChange) ([]*models.ConfigChange, error) {
191+
// use cache to filter out ones that we've already seen before
192+
changes = lo.Filter(changes, func(c *models.ConfigChange, _ int) bool {
193+
_, found := configChangesCache.Get(c.ConfigID + c.ExternalChangeId)
194+
if found {
195+
_ = found
196+
}
197+
return !found
198+
})
199+
200+
if len(changes) == 0 {
201+
return nil, nil
202+
}
203+
204+
query := `SELECT config_id, external_change_id
205+
FROM config_changes
206+
WHERE (config_id, external_change_id) IN ?`
207+
args := lo.Map(changes, func(c *models.ConfigChange, _ int) []string {
208+
return []string{c.ConfigID, c.ExternalChangeId}
209+
})
210+
211+
rows, err := ctx.DB().Raw(query, args).Rows()
212+
if err != nil {
213+
return nil, err
214+
}
215+
defer rows.Close()
216+
217+
existing := make(map[string]struct{})
218+
for rows.Next() {
219+
var configID, externalChangeID string
220+
if err := rows.Scan(&configID, &externalChangeID); err != nil {
221+
return nil, err
222+
}
223+
224+
configChangesCache.SetDefault(configID+externalChangeID, struct{}{})
225+
existing[configID+externalChangeID] = struct{}{}
226+
}
227+
228+
newOnes := lo.Filter(changes, func(c *models.ConfigChange, _ int) bool {
229+
_, found := existing[c.ConfigID+c.ExternalChangeId]
230+
return !found
231+
})
232+
233+
if len(newOnes) > 0 {
234+
_ = query
235+
}
236+
237+
return newOnes, nil
238+
}

db/update.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,8 @@ func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult)
209209

210210
func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) ([]*models.ConfigChange, []*models.ConfigChange, error) {
211211
var (
212-
newOnes = []*models.ConfigChange{}
213-
updates = []*models.ConfigChange{}
212+
toInsert = []*models.ConfigChange{}
213+
toUpdate = []*models.ConfigChange{}
214214
)
215215

216216
changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...)
@@ -267,17 +267,19 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C
267267
}
268268

269269
if changeResult.UpdateExisting {
270-
updates = append(updates, change)
270+
toUpdate = append(toUpdate, change)
271271
} else {
272-
if ok, err := ctx.TempCache().IsChangePersisted(change.ConfigID, change.ExternalChangeId); err != nil {
273-
return nil, nil, fmt.Errorf("failed to check if change is persisted: %w", err)
274-
} else if !ok {
275-
newOnes = append(newOnes, change)
276-
}
272+
toInsert = append(toInsert, change)
277273
}
278274
}
279275

280-
return newOnes, updates, nil
276+
// Remove the changes that have already been inserted.
277+
newOnes, err := filterOutPersistedChanges(ctx, toInsert)
278+
if err != nil {
279+
return nil, nil, err
280+
}
281+
282+
return newOnes, toUpdate, nil
281283
}
282284

283285
func upsertAnalysis(ctx api.ScrapeContext, result *v1.ScrapeResult) error {

0 commit comments

Comments
 (0)