diff --git a/examples/kv_cache_index/main.go b/examples/kv_cache_index/main.go index 61c5beac..2b7f80d3 100644 --- a/examples/kv_cache_index/main.go +++ b/examples/kv_cache_index/main.go @@ -132,7 +132,7 @@ func runPrompts(ctx context.Context, kvCacheIndexer *kvcache.Indexer) error { ModelName: modelName, ChunkHash: h, } - }), []kvblock.PodEntry{{"pod1", "gpu"}}) + }), []kvblock.PodEntry{{PodIdentifier: "pod1", DeviceTier: "gpu"}}) // Sleep 3 secs time.Sleep(3 * time.Second) diff --git a/go.mod b/go.mod index bedf081b..08d82574 100644 --- a/go.mod +++ b/go.mod @@ -13,9 +13,11 @@ require ( github.com/pebbe/zmq4 v1.4.0 github.com/prometheus/client_golang v1.22.0 github.com/prometheus/client_model v0.6.1 - github.com/redis/go-redis/v9 v9.7.3 + github.com/redis/go-redis/extra/redisotel/v9 v9.14.0 + github.com/redis/go-redis/v9 v9.14.0 github.com/stretchr/testify v1.10.0 github.com/vmihailenco/msgpack/v5 v5.4.1 + go.opentelemetry.io/otel v1.33.0 golang.org/x/sync v0.12.0 k8s.io/apimachinery v0.33.0 k8s.io/client-go v0.33.0 @@ -29,6 +31,7 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.23.0 // indirect @@ -46,10 +49,14 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/common v0.62.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/redis/go-redis/extra/rediscmd/v9 v9.14.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/metric v1.33.0 // indirect + go.opentelemetry.io/otel/trace v1.33.0 // indirect golang.org/x/net v0.38.0 // indirect golang.org/x/oauth2 v0.27.0 // indirect golang.org/x/sys v0.35.0 // indirect diff --git a/go.sum b/go.sum index ed825a6a..ec68e3fb 100644 --- a/go.sum +++ b/go.sum @@ -27,8 +27,11 @@ github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxER github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= @@ -94,8 +97,12 @@ github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= -github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM= -github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= +github.com/redis/go-redis/extra/rediscmd/v9 v9.14.0 h1:DF7JP9CeCIEWbvVKA3r7dxCB1cUvEm+cD8fgWCn7R0g= +github.com/redis/go-redis/extra/rediscmd/v9 v9.14.0/go.mod h1:JCn91QtwR6qo3PEs35hcpBSirjqKpKwSSjnZX4kYgI0= +github.com/redis/go-redis/extra/redisotel/v9 v9.14.0 h1:kXIdyUBHeXsR1foSU+qdZjo3tROk5Rb2HS1kp99YuPM= +github.com/redis/go-redis/extra/redisotel/v9 v9.14.0/go.mod h1:LafdjmKxzRKYznKgcVeqS3vIiBCsY90JbB0pDgHt774= +github.com/redis/go-redis/v9 v9.14.0 h1:u4tNCjXOyzfgeLN+vAZaW1xUooqWDqVEsZN0U01jfAE= +github.com/redis/go-redis/v9 v9.14.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= @@ -121,6 +128,16 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw= +go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I= +go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5WtEnklQ= +go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M= +go.opentelemetry.io/otel/sdk v1.33.0 h1:iax7M131HuAm9QkZotNHEfstof92xM+N8sr3uHXc2IM= +go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJNbiENQuihM= +go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= +go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/pkg/kvcache/indexer.go b/pkg/kvcache/indexer.go index 3a9574e5..ed33dec3 100644 --- a/pkg/kvcache/indexer.go +++ b/pkg/kvcache/indexer.go @@ -20,6 +20,8 @@ import ( "context" "fmt" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" @@ -117,30 +119,115 @@ func (k *Indexer) KVBlockIndex() kvblock.Index { func (k *Indexer) GetPodScores(ctx context.Context, prompt, modelName string, podIdentifiers []string, ) (map[string]int, error) { + tracer := otel.GetTracerProvider().Tracer("llm-d-kv-cache-manager") + ctx, span := tracer.Start(ctx, "llm_d.kv_cache_manager.GetPodScores") + defer span.End() + + span.SetAttributes( + attribute.String("gen_ai.request.model", modelName), + attribute.Int("llm_d.kv_cache_manager.pod_count", len(podIdentifiers)), + ) + traceLogger := klog.FromContext(ctx).V(logging.TRACE).WithName("kvcache.GetPodScores") // 1. tokenize prompt + // 1. get available tokens of longest prefix + _, tokenSpan := tracer.Start(ctx, "llm_d.kv_cache_manager.find_tokens") + tokenSpan.SetAttributes( + attribute.String("gen_ai.request.model", modelName), + ) tokens := k.tokenizersPool.Tokenize(prompt, modelName) + if len(tokens) == 0 { + tokenSpan.SetAttributes( + attribute.Int("llm_d.kv_cache_manager.tokens_found", 0), + attribute.String("operation.outcome", "success"), + ) + tokenSpan.End() + //nolint:nilnil // no need to return an error + return nil, nil + } + tokenSpan.SetAttributes( + attribute.Int("llm_d.kv_cache_manager.tokens_found", len(tokens)), + attribute.String("operation.outcome", "success"), + ) + tokenSpan.End() // 2. get block keys + _, blockSpan := tracer.Start(ctx, "llm_d.kv_cache_manager.tokens_to_block_keys") + blockSpan.SetAttributes( + attribute.String("gen_ai.request.model", modelName), + attribute.Int("llm_d.kv_cache_manager.input_tokens", len(tokens)), + ) blockKeys := k.tokensProcessor.TokensToKVBlockKeys(tokens, modelName) + blockSpan.SetAttributes( + attribute.Int("llm_d.kv_cache_manager.block_keys_generated", len(blockKeys)), + attribute.String("operation.outcome", "success"), + ) + blockSpan.End() traceLogger.Info("found tokens", "tokens", tokens, "block-keys", blockKeys) // 3. query kvblock indexer for pods + _, lookupSpan := tracer.Start(ctx, "llm_d.kv_cache_manager.lookup_pods") + lookupSpan.SetAttributes( + attribute.String("gen_ai.request.model", modelName), + attribute.Int("llm_d.kv_cache_manager.block_keys_count", len(blockKeys)), + ) keyToPods, err := k.kvBlockIndex.Lookup(ctx, blockKeys, sets.New(podIdentifiers...)) if err != nil { + lookupSpan.RecordError(err) + lookupSpan.SetAttributes(attribute.String("operation.outcome", "error")) + lookupSpan.End() + span.RecordError(err) + span.SetAttributes(attribute.String("operation.outcome", "error")) return nil, fmt.Errorf("failed to query kvblock indexer: %w", err) } + lookupSpan.SetAttributes( + attribute.Int("llm_d.kv_cache_manager.lookup_results", len(keyToPods)), + attribute.String("operation.outcome", "success"), + ) + lookupSpan.End() traceLogger.Info("found block keys", "block-keys", blockKeys, "pods", podsPerKeyPrintHelper(keyToPods)) // 4. score pods + _, scoreSpan := tracer.Start(ctx, "llm_d.kv_cache_manager.score_pods") + scoreSpan.SetAttributes( + attribute.String("gen_ai.request.model", modelName), + attribute.Int("llm_d.kv_cache_manager.block_keys_count", len(blockKeys)), + ) podScores, err := k.kvBlockScorer.Score(blockKeys, keyToPods) if err != nil { + scoreSpan.RecordError(err) + scoreSpan.SetAttributes(attribute.String("operation.outcome", "error")) + scoreSpan.End() + span.RecordError(err) + span.SetAttributes(attribute.String("operation.outcome", "error")) return nil, fmt.Errorf("failed to query kvblock scorer: %w", err) } + scoreSpan.SetAttributes( + attribute.Int("llm_d.kv_cache_manager.scored_pods", len(podScores)), + attribute.String("operation.outcome", "success"), + ) + scoreSpan.End() traceLogger.Info("found pod scores", "pod-scores", podScores) + // Calculate hit ratio for observability + totalPods := len(podIdentifiers) + if totalPods == 0 { + // If no specific pods requested, use all pods with scores + totalPods = len(podScores) + } + + var hitRatio float64 + if totalPods > 0 { + hitRatio = float64(len(podScores)) / float64(totalPods) + } + + span.SetAttributes( + attribute.Float64("llm_d.kv_cache_manager.hit_ratio", hitRatio), + attribute.String("operation.outcome", "success"), + ) + return podScores, nil } diff --git a/pkg/kvcache/kvblock/index.go b/pkg/kvcache/kvblock/index.go index 45bb9442..2fb27897 100644 --- a/pkg/kvcache/kvblock/index.go +++ b/pkg/kvcache/kvblock/index.go @@ -49,6 +49,7 @@ type IndexConfig struct { func DefaultIndexConfig() *IndexConfig { return &IndexConfig{ InMemoryConfig: DefaultInMemoryIndexConfig(), + RedisConfig: DefaultRedisIndexConfig(), EnableMetrics: false, } } diff --git a/pkg/kvcache/kvblock/redis.go b/pkg/kvcache/kvblock/redis.go index 41689c69..98c3c5c0 100644 --- a/pkg/kvcache/kvblock/redis.go +++ b/pkg/kvcache/kvblock/redis.go @@ -23,6 +23,7 @@ import ( "strings" "time" + "github.com/redis/go-redis/extra/redisotel/v9" "github.com/redis/go-redis/v9" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" @@ -57,6 +58,17 @@ func NewRedisIndex(config *RedisIndexConfig) (Index, error) { } redisClient := redis.NewClient(redisOpt) + + // Enable automatic OpenTelemetry tracing for Redis operations + if err := redisotel.InstrumentTracing(redisClient); err != nil { + return nil, fmt.Errorf("failed to instrument Redis tracing: %w", err) + } + + // Enable automatic OpenTelemetry metrics for Redis operations + if err := redisotel.InstrumentMetrics(redisClient); err != nil { + return nil, fmt.Errorf("failed to instrument Redis metrics: %w", err) + } + if err := redisClient.Ping(context.Background()).Err(); err != nil { return nil, fmt.Errorf("failed to connect to Redis: %w", err) }