Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/kv_cache_index/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func runPrompts(ctx context.Context, kvCacheIndexer *kvcache.Indexer) error {
ModelName: modelName,
ChunkHash: h,
}
}), []kvblock.PodEntry{{"pod1", "gpu"}})
}), []kvblock.PodEntry{{PodIdentifier: "pod1", DeviceTier: "gpu"}})

// Sleep 3 secs
time.Sleep(3 * time.Second)
Expand Down
9 changes: 8 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ require (
github.com/pebbe/zmq4 v1.4.0
github.com/prometheus/client_golang v1.22.0
github.com/prometheus/client_model v0.6.1
github.com/redis/go-redis/v9 v9.7.3
github.com/redis/go-redis/extra/redisotel/v9 v9.14.0
github.com/redis/go-redis/v9 v9.14.0
github.com/stretchr/testify v1.10.0
github.com/vmihailenco/msgpack/v5 v5.4.1
go.opentelemetry.io/otel v1.33.0
golang.org/x/sync v0.12.0
k8s.io/apimachinery v0.33.0
k8s.io/client-go v0.33.0
Expand All @@ -29,6 +31,7 @@ require (
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
Expand All @@ -46,10 +49,14 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/redis/go-redis/extra/rediscmd/v9 v9.14.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/metric v1.33.0 // indirect
go.opentelemetry.io/otel/trace v1.33.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/oauth2 v0.27.0 // indirect
golang.org/x/sys v0.35.0 // indirect
Expand Down
21 changes: 19 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxER
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs=
github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ=
github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY=
Expand Down Expand Up @@ -94,8 +97,12 @@ github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM=
github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA=
github.com/redis/go-redis/extra/rediscmd/v9 v9.14.0 h1:DF7JP9CeCIEWbvVKA3r7dxCB1cUvEm+cD8fgWCn7R0g=
github.com/redis/go-redis/extra/rediscmd/v9 v9.14.0/go.mod h1:JCn91QtwR6qo3PEs35hcpBSirjqKpKwSSjnZX4kYgI0=
github.com/redis/go-redis/extra/redisotel/v9 v9.14.0 h1:kXIdyUBHeXsR1foSU+qdZjo3tROk5Rb2HS1kp99YuPM=
github.com/redis/go-redis/extra/redisotel/v9 v9.14.0/go.mod h1:LafdjmKxzRKYznKgcVeqS3vIiBCsY90JbB0pDgHt774=
github.com/redis/go-redis/v9 v9.14.0 h1:u4tNCjXOyzfgeLN+vAZaW1xUooqWDqVEsZN0U01jfAE=
github.com/redis/go-redis/v9 v9.14.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
Expand All @@ -121,6 +128,16 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw=
go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I=
go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5WtEnklQ=
go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M=
go.opentelemetry.io/otel/sdk v1.33.0 h1:iax7M131HuAm9QkZotNHEfstof92xM+N8sr3uHXc2IM=
go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJNbiENQuihM=
go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s=
go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
87 changes: 87 additions & 0 deletions pkg/kvcache/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -117,30 +119,115 @@ func (k *Indexer) KVBlockIndex() kvblock.Index {
func (k *Indexer) GetPodScores(ctx context.Context, prompt, modelName string,
podIdentifiers []string,
) (map[string]int, error) {
tracer := otel.GetTracerProvider().Tracer("llm-d-kv-cache-manager")
ctx, span := tracer.Start(ctx, "llm_d.kv_cache_manager.GetPodScores")
defer span.End()

span.SetAttributes(
attribute.String("gen_ai.request.model", modelName),
attribute.Int("llm_d.kv_cache_manager.pod_count", len(podIdentifiers)),
)

traceLogger := klog.FromContext(ctx).V(logging.TRACE).WithName("kvcache.GetPodScores")

// 1. tokenize prompt
// 1. get available tokens of longest prefix
_, tokenSpan := tracer.Start(ctx, "llm_d.kv_cache_manager.find_tokens")
tokenSpan.SetAttributes(
attribute.String("gen_ai.request.model", modelName),
)
tokens := k.tokenizersPool.Tokenize(prompt, modelName)
if len(tokens) == 0 {
tokenSpan.SetAttributes(
attribute.Int("llm_d.kv_cache_manager.tokens_found", 0),
attribute.String("operation.outcome", "success"),
)
tokenSpan.End()
//nolint:nilnil // no need to return an error
return nil, nil
}
tokenSpan.SetAttributes(
attribute.Int("llm_d.kv_cache_manager.tokens_found", len(tokens)),
attribute.String("operation.outcome", "success"),
)
tokenSpan.End()

// 2. get block keys
_, blockSpan := tracer.Start(ctx, "llm_d.kv_cache_manager.tokens_to_block_keys")
blockSpan.SetAttributes(
attribute.String("gen_ai.request.model", modelName),
attribute.Int("llm_d.kv_cache_manager.input_tokens", len(tokens)),
)
blockKeys := k.tokensProcessor.TokensToKVBlockKeys(tokens, modelName)
blockSpan.SetAttributes(
attribute.Int("llm_d.kv_cache_manager.block_keys_generated", len(blockKeys)),
attribute.String("operation.outcome", "success"),
)
blockSpan.End()
traceLogger.Info("found tokens", "tokens", tokens, "block-keys", blockKeys)

// 3. query kvblock indexer for pods
_, lookupSpan := tracer.Start(ctx, "llm_d.kv_cache_manager.lookup_pods")
lookupSpan.SetAttributes(
attribute.String("gen_ai.request.model", modelName),
attribute.Int("llm_d.kv_cache_manager.block_keys_count", len(blockKeys)),
)
keyToPods, err := k.kvBlockIndex.Lookup(ctx, blockKeys, sets.New(podIdentifiers...))
if err != nil {
lookupSpan.RecordError(err)
lookupSpan.SetAttributes(attribute.String("operation.outcome", "error"))
lookupSpan.End()
span.RecordError(err)
span.SetAttributes(attribute.String("operation.outcome", "error"))
return nil, fmt.Errorf("failed to query kvblock indexer: %w", err)
}
lookupSpan.SetAttributes(
attribute.Int("llm_d.kv_cache_manager.lookup_results", len(keyToPods)),
attribute.String("operation.outcome", "success"),
)
lookupSpan.End()
traceLogger.Info("found block keys", "block-keys", blockKeys,
"pods", podsPerKeyPrintHelper(keyToPods))

// 4. score pods
_, scoreSpan := tracer.Start(ctx, "llm_d.kv_cache_manager.score_pods")
scoreSpan.SetAttributes(
attribute.String("gen_ai.request.model", modelName),
attribute.Int("llm_d.kv_cache_manager.block_keys_count", len(blockKeys)),
)
podScores, err := k.kvBlockScorer.Score(blockKeys, keyToPods)
if err != nil {
scoreSpan.RecordError(err)
scoreSpan.SetAttributes(attribute.String("operation.outcome", "error"))
scoreSpan.End()
span.RecordError(err)
span.SetAttributes(attribute.String("operation.outcome", "error"))
return nil, fmt.Errorf("failed to query kvblock scorer: %w", err)
}
scoreSpan.SetAttributes(
attribute.Int("llm_d.kv_cache_manager.scored_pods", len(podScores)),
attribute.String("operation.outcome", "success"),
)
scoreSpan.End()
traceLogger.Info("found pod scores", "pod-scores", podScores)

// Calculate hit ratio for observability
totalPods := len(podIdentifiers)
if totalPods == 0 {
// If no specific pods requested, use all pods with scores
totalPods = len(podScores)
}

var hitRatio float64
if totalPods > 0 {
hitRatio = float64(len(podScores)) / float64(totalPods)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if podScores == 0?

}

span.SetAttributes(
attribute.Float64("llm_d.kv_cache_manager.hit_ratio", hitRatio),
attribute.String("operation.outcome", "success"),
)

return podScores, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kvcache/kvblock/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type IndexConfig struct {
func DefaultIndexConfig() *IndexConfig {
return &IndexConfig{
InMemoryConfig: DefaultInMemoryIndexConfig(),
RedisConfig: DefaultRedisIndexConfig(),
EnableMetrics: false,
}
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/kvcache/kvblock/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

"github.com/redis/go-redis/extra/redisotel/v9"
"github.com/redis/go-redis/v9"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -57,6 +58,17 @@ func NewRedisIndex(config *RedisIndexConfig) (Index, error) {
}

redisClient := redis.NewClient(redisOpt)

// Enable automatic OpenTelemetry tracing for Redis operations
if err := redisotel.InstrumentTracing(redisClient); err != nil {
return nil, fmt.Errorf("failed to instrument Redis tracing: %w", err)
}

// Enable automatic OpenTelemetry metrics for Redis operations
if err := redisotel.InstrumentMetrics(redisClient); err != nil {
return nil, fmt.Errorf("failed to instrument Redis metrics: %w", err)
}

if err := redisClient.Ping(context.Background()).Err(); err != nil {
return nil, fmt.Errorf("failed to connect to Redis: %w", err)
}
Expand Down
Loading