Skip to content

Commit bc57dee

Browse files
authored
use context in indexer go routine instead of context.TODO (#1491)
* fixed bug of unprotected concurrent writes to a map in prefix indexer Signed-off-by: Nir Rozenbaum <[email protected]> * revert locking changes in indexer Signed-off-by: Nir Rozenbaum <[email protected]> * replace context.TODO with a context Signed-off-by: Nir Rozenbaum <[email protected]> --------- Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent d7b2a56 commit bc57dee

File tree

3 files changed

+11
-10
lines changed

3 files changed

+11
-10
lines changed

pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,15 @@ type indexer struct {
3838
}
3939

4040
// newIndexer initializes an indexer with size limits and starts cache size reporting.
41-
func newIndexer(maxLRUSize int) *indexer {
42-
ix := &indexer{
41+
func newIndexer(ctx context.Context, maxLRUSize int) *indexer {
42+
indexer := &indexer{
4343
hashToPods: make(map[BlockHash]podSet),
4444
podToLRU: make(map[ServerID]*lru.Cache[BlockHash, struct{}]),
4545
maxLRUSize: maxLRUSize,
4646
}
4747

48-
go ix.ReportLRUSize(time.Second)
49-
return ix
48+
go indexer.reportLRUSize(ctx, time.Second)
49+
return indexer
5050
}
5151

5252
// Add adds a list of prefix hashes to the cache, tied to the server.
@@ -111,8 +111,8 @@ func (i *indexer) makeEvictionFn(pod ServerID) func(BlockHash, struct{}) {
111111
}
112112
}
113113

114-
// ReportLRUSize starts a goroutine that periodically reports the LRU cache size metric.
115-
func (i *indexer) ReportLRUSize(interval time.Duration) {
114+
// reportLRUSize starts a goroutine that periodically reports the LRU cache size metric.
115+
func (i *indexer) reportLRUSize(ctx context.Context, interval time.Duration) {
116116
ticker := time.NewTicker(interval)
117117
defer ticker.Stop()
118118
for range ticker.C {
@@ -137,7 +137,7 @@ func (i *indexer) ReportLRUSize(interval time.Duration) {
137137
}
138138

139139
metrics.RecordPrefixCacheSize(int64(totalEntries))
140-
log.FromContext(context.TODO()).V(logutil.TRACE).Info("Prefix cache state",
140+
log.FromContext(ctx).V(logutil.TRACE).Info("Prefix cache state",
141141
"total entries", totalEntries,
142142
"# pods", numPods,
143143
"avg entries per pod", avg,

pkg/epp/scheduling/framework/plugins/multi/prefix/indexer_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@ limitations under the License.
1616
package prefix
1717

1818
import (
19+
"context"
1920
"testing"
2021

2122
"github.com/stretchr/testify/assert"
2223
)
2324

2425
func TestIndexer_AddAndGet(t *testing.T) {
25-
i := newIndexer(2)
26+
i := newIndexer(context.Background(), 2)
2627

2728
hash1 := BlockHash(1)
2829
server := ServerID{Namespace: "default", Name: "server1"}

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func New(ctx context.Context, config Config) *Plugin {
148148
capacity := config.LRUCapacityPerServer
149149
if capacity <= 0 {
150150
capacity = DefaultLRUCapacityPerServer
151-
log.FromContext(context.TODO()).V(logutil.DEFAULT).Info(
151+
log.FromContext(ctx).V(logutil.DEFAULT).Info(
152152
"LRUCapacityPerServer is not positive, using default value",
153153
"defaultCapacity", DefaultLRUCapacityPerServer,
154154
)
@@ -158,7 +158,7 @@ func New(ctx context.Context, config Config) *Plugin {
158158
typedName: plugins.TypedName{Type: PrefixCachePluginType, Name: PrefixCachePluginType},
159159
config: config,
160160
pluginState: plugins.NewPluginState(ctx),
161-
indexer: newIndexer(capacity),
161+
indexer: newIndexer(ctx, capacity),
162162
}
163163
}
164164

0 commit comments

Comments
 (0)