From 7645d1bba3f15c268a30662b57aa494acf08c75f Mon Sep 17 00:00:00 2001 From: Jiekun Date: Mon, 14 Jul 2025 11:57:03 +0800 Subject: [PATCH 1/6] feature: [trace_id idx] add trace_id index to accelerate query execution --- app/vtinsert/opentelemetry/opentelemetry.go | 16 +- app/vtselect/traces/query/query.go | 246 ++++++++++++------ .../opentelemetry/pb/trace_fields.go | 7 + 3 files changed, 187 insertions(+), 82 deletions(-) diff --git a/app/vtinsert/opentelemetry/opentelemetry.go b/app/vtinsert/opentelemetry/opentelemetry.go index 16d1da8f6..817d3427b 100644 --- a/app/vtinsert/opentelemetry/opentelemetry.go +++ b/app/vtinsert/opentelemetry/opentelemetry.go @@ -10,10 +10,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" - "github.com/VictoriaMetrics/metrics" - "github.com/VictoriaMetrics/VictoriaTraces/app/vtinsert/insertutil" otelpb "github.com/VictoriaMetrics/VictoriaTraces/lib/protoparser/opentelemetry/pb" + "github.com/VictoriaMetrics/metrics" ) var maxRequestSize = flagutil.NewBytes("opentelemetry.traces.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single OpenTelemetry trace export request.") @@ -26,8 +25,9 @@ var ( ) var ( - mandatoryStreamFields = []string{otelpb.ResourceAttrServiceName, otelpb.NameField} - msgFieldValue = "-" + mandatoryStreamFields = []string{otelpb.ResourceAttrServiceName, otelpb.NameField} + traceIDIndexStreamFields = []logstorage.Field{{Name: otelpb.TraceIDIndexStreamName, Value: otelpb.TraceIDIndexStreamValue}} + msgFieldValue = "-" ) // RequestHandler processes Opentelemetry insert requests @@ -176,6 +176,14 @@ func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field, Value: msgFieldValue, }) lmp.AddRow(int64(span.EndTimeUnixNano), fields, nil) + + // for root span, create an entity in trace-id-idx stream. + if span.ParentSpanID == "" { + lmp.AddRow(int64(span.StartTimeUnixNano), []logstorage.Field{ + {Name: otelpb.TraceIDIndexFieldName, Value: span.TraceID}, + {Name: "_msg", Value: msgFieldValue}, + }, traceIDIndexStreamFields) + } return fields } diff --git a/app/vtselect/traces/query/query.go b/app/vtselect/traces/query/query.go index 68dc33947..310185281 100644 --- a/app/vtselect/traces/query/query.go +++ b/app/vtselect/traces/query/query.go @@ -12,6 +12,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage" otelpb "github.com/VictoriaMetrics/VictoriaTraces/lib/protoparser/opentelemetry/pb" @@ -137,89 +138,22 @@ func GetSpanNameList(ctx context.Context, cp *CommonParams, serviceName string) func GetTrace(ctx context.Context, cp *CommonParams, traceID string) ([]*Row, error) { currentTime := time.Now() - // query: trace_id:traceID - qStr := fmt.Sprintf(otelpb.TraceIDField+": %q", traceID) + // query: {trace_id_idx="-"} AND trace_id:traceID + qStr := fmt.Sprintf(`{%s="%s"} AND %s:=%q | fields _time`, otelpb.TraceIDIndexStreamName, otelpb.TraceIDIndexStreamValue, otelpb.TraceIDIndexFieldName, traceID) q, err := logstorage.ParseQueryAtTimestamp(qStr, currentTime.UnixNano()) + + traceStartTime, err := findTraceIDTimeSplitTimeRange(ctx, q, cp) if err != nil { - return nil, fmt.Errorf("cannot parse query [%s]: %s", qStr, err) + return nil, fmt.Errorf("cannot find trace_id %q start time: %s", traceID, err) } - ctxWithCancel, cancel := context.WithCancel(ctx) - - // search for trace spans and write to `rows []*Row` - var rowsLock sync.Mutex - var rows []*Row - var missingTimeColumn atomic.Bool - writeBlock := func(_ uint, db *logstorage.DataBlock) { - if missingTimeColumn.Load() { - return - } - - columns := db.Columns - clonedColumnNames := make([]string, len(columns)) - for i, c := range columns { - clonedColumnNames[i] = strings.Clone(c.Name) - } - - timestamps, ok := db.GetTimestamps(nil) - if !ok { - missingTimeColumn.Store(true) - cancel() - return - } - - for i, timestamp := range timestamps { - fields := make([]logstorage.Field, 0, len(columns)) - for j := range columns { - // column could be empty if this span does not contain such field. - // only append non-empty columns. - if columns[j].Values[i] != "" { - fields = append(fields, logstorage.Field{ - Name: clonedColumnNames[j], - Value: strings.Clone(columns[j].Values[i]), - }) - } - } - - rowsLock.Lock() - rows = append(rows, &Row{ - Timestamp: timestamp, - Fields: fields, - }) - rowsLock.Unlock() - } + // fast path: trace start time found, search in [trace start time, trace start time + *traceMaxDurationWindow] time range. + if !traceStartTime.IsZero() { + return findSpansByTraceIDAndTime(ctx, cp, traceID, traceStartTime, traceStartTime.Add(*traceMaxDurationWindow)) } - - startTime := currentTime.Add(-*traceSearchStep) - endTime := currentTime - for startTime.UnixNano() > 0 { // todo: no need to search time range before retention period. - qq := q.CloneWithTimeFilter(currentTime.UnixNano(), startTime.UnixNano(), endTime.UnixNano()) - if err = vtstorage.RunQuery(ctxWithCancel, cp.TenantIDs, qq, writeBlock); err != nil { - return nil, err - } - if missingTimeColumn.Load() { - return nil, fmt.Errorf("missing _time column in the result for the query [%s]", qq) - } - - // no hit in this time range, continue with step. - if len(rows) == 0 { - endTime = startTime - startTime = startTime.Add(-*traceSearchStep) - continue - } - - // found result, perform extra search for traceMaxDurationWindow and then break. - qq = q.CloneWithTimeFilter(currentTime.UnixNano(), startTime.Add(-*traceMaxDurationWindow).UnixNano(), startTime.UnixNano()) - if err = vtstorage.RunQuery(ctxWithCancel, cp.TenantIDs, qq, writeBlock); err != nil { - return nil, err - } - if missingTimeColumn.Load() { - return nil, fmt.Errorf("missing _time column in the result for the query [%s]", qq) - } - break - } - - return rows, nil + // slow path: if trace start time not exist, probably the root span was not available. + // try to search from now to 0 timestamp. + return findSpansByTraceID(ctx, cp, traceID) } // GetTraceList returns multiple traceIDs and spans of them in []*Row format. @@ -420,6 +354,162 @@ func findTraceIDsSplitTimeRange(ctx context.Context, q *logstorage.Query, cp *Co return checkTraceIDList(traceIDList), maxStartTime, nil } +// findTraceIDTimeSplitTimeRange try to search from {trace_id_idx_stream="-"} stream, which contains +// the trace_id and start time of the root span. It returns the start time of the trace if found. +// Otherwise, the root span may not reach VictoriaTraces, and zero time is returned. +func findTraceIDTimeSplitTimeRange(ctx context.Context, q *logstorage.Query, cp *CommonParams) (time.Time, error) { + traceIDStartTimeInt := int64(0) + var missingTimeColumn atomic.Bool + ctxWithCancel, cancel := context.WithCancel(ctx) + + writeBlock := func(_ uint, db *logstorage.DataBlock) { + if missingTimeColumn.Load() { + return + } + + columns := db.Columns + clonedColumnNames := make([]string, len(columns)) + for i, c := range columns { + clonedColumnNames[i] = strings.Clone(c.Name) + } + + timestamps, ok := db.GetTimestamps(nil) + if !ok { + missingTimeColumn.Store(true) + cancel() + return + } + if len(timestamps) > 1 { + logger.Errorf("found multiple trace_id in index, timestamps: %v", timestamps) + } + traceIDStartTimeInt = timestamps[0] + } + + currentTime := time.Now() + startTime := currentTime.Add(-*traceSearchStep) + endTime := currentTime + for startTime.UnixNano() > 0 { // todo: no need to search time range before retention period. + qq := q.CloneWithTimeFilter(currentTime.UnixNano(), startTime.UnixNano(), endTime.UnixNano()) + if err := vtstorage.RunQuery(ctxWithCancel, cp.TenantIDs, qq, writeBlock); err != nil { + return time.Time{}, err + } + if missingTimeColumn.Load() { + return time.Time{}, fmt.Errorf("missing _time column in the result for the query [%s]", qq) + } + + // no hit in this time range, continue with step. + if traceIDStartTimeInt == 0 { + endTime = startTime + startTime = startTime.Add(-*traceSearchStep) + continue + } + + // found result, perform extra search for traceMaxDurationWindow and then break. + return time.Unix(traceIDStartTimeInt/1e9, traceIDStartTimeInt%1e9), nil + } + + return time.Time{}, nil +} + +// findSpanByTraceID search for spans from now to 0 time with steps. +// It stops when rows are found in a time range. +func findSpansByTraceID(ctx context.Context, cp *CommonParams, traceID string) ([]*Row, error) { + // query: trace_id:traceID + currentTime := time.Now() + startTime := currentTime.Add(-*traceSearchStep) + endTime := currentTime + var ( + rows []*Row + err error + ) + for startTime.UnixNano() > 0 { // todo: no need to search time range before retention period. + rows, err = findSpansByTraceIDAndTime(ctx, cp, traceID, startTime, endTime) + if err != nil { + return nil, err + } + // no hit in this time range, continue with step. + if len(rows) == 0 { + endTime = startTime + startTime = startTime.Add(-*traceSearchStep) + continue + } + + // found result, perform extra search for traceMaxDurationWindow and then break. + extraRows, err := findSpansByTraceIDAndTime(ctx, cp, traceID, startTime.Add(-*traceMaxDurationWindow), startTime) + if err != nil { + return nil, err + } + rows = append(rows, extraRows...) + break + } + return rows, nil +} + +// findSpansByTraceIDAndTime search for spans in given time range. +func findSpansByTraceIDAndTime(ctx context.Context, cp *CommonParams, traceID string, startTime, endTime time.Time) ([]*Row, error) { + // query: trace_id:traceID + qStr := fmt.Sprintf(otelpb.TraceIDField+": %q", traceID) + q, err := logstorage.ParseQueryAtTimestamp(qStr, endTime.UnixNano()) + if err != nil { + return nil, fmt.Errorf("cannot parse query [%s]: %s", qStr, err) + } + + ctxWithCancel, cancel := context.WithCancel(ctx) + + // search for trace spans and write to `rows []*Row` + var rowsLock sync.Mutex + var rows []*Row + var missingTimeColumn atomic.Bool + writeBlock := func(_ uint, db *logstorage.DataBlock) { + if missingTimeColumn.Load() { + return + } + + columns := db.Columns + clonedColumnNames := make([]string, len(columns)) + for i, c := range columns { + clonedColumnNames[i] = strings.Clone(c.Name) + } + + timestamps, ok := db.GetTimestamps(nil) + if !ok { + missingTimeColumn.Store(true) + cancel() + return + } + + for i, timestamp := range timestamps { + fields := make([]logstorage.Field, 0, len(columns)) + for j := range columns { + // column could be empty if this span does not contain such field. + // only append non-empty columns. + if columns[j].Values[i] != "" { + fields = append(fields, logstorage.Field{ + Name: clonedColumnNames[j], + Value: strings.Clone(columns[j].Values[i]), + }) + } + } + + rowsLock.Lock() + rows = append(rows, &Row{ + Timestamp: timestamp, + Fields: fields, + }) + rowsLock.Unlock() + } + } + + qq := q.CloneWithTimeFilter(endTime.UnixNano(), startTime.UnixNano(), endTime.UnixNano()) + if err = vtstorage.RunQuery(ctxWithCancel, cp.TenantIDs, qq, writeBlock); err != nil { + return nil, err + } + if missingTimeColumn.Load() { + return nil, fmt.Errorf("missing _time column in the result for the query [%s]", qq) + } + return rows, nil +} + // checkTraceIDList removes invalid `trace_id`. It helps prevent query injection. func checkTraceIDList(traceIDList []string) []string { result := make([]string, 0, len(traceIDList)) diff --git a/lib/protoparser/opentelemetry/pb/trace_fields.go b/lib/protoparser/opentelemetry/pb/trace_fields.go index 5709c9e06..0e1a080d4 100644 --- a/lib/protoparser/opentelemetry/pb/trace_fields.go +++ b/lib/protoparser/opentelemetry/pb/trace_fields.go @@ -2,6 +2,13 @@ package pb // trace_fields.go contains field names when storing OTLP trace span data in VictoriaLogs. +// Special: TraceID index stream and fields +const ( + TraceIDIndexStreamName = "trace_id_idx_stream" + TraceIDIndexStreamValue = "-" + TraceIDIndexFieldName = "trace_id_idx" +) + // Resource const ( ResourceAttrPrefix = "resource_attr:" From 5832afa6a745b613ddc9ec93a44f6478cf16b2c9 Mon Sep 17 00:00:00 2001 From: Jiekun Date: Mon, 14 Jul 2025 14:06:58 +0800 Subject: [PATCH 2/6] feature: [trace_id idx] make linter happy --- app/vtselect/traces/query/query.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/app/vtselect/traces/query/query.go b/app/vtselect/traces/query/query.go index 310185281..77acf326e 100644 --- a/app/vtselect/traces/query/query.go +++ b/app/vtselect/traces/query/query.go @@ -141,6 +141,9 @@ func GetTrace(ctx context.Context, cp *CommonParams, traceID string) ([]*Row, er // query: {trace_id_idx="-"} AND trace_id:traceID qStr := fmt.Sprintf(`{%s="%s"} AND %s:=%q | fields _time`, otelpb.TraceIDIndexStreamName, otelpb.TraceIDIndexStreamValue, otelpb.TraceIDIndexFieldName, traceID) q, err := logstorage.ParseQueryAtTimestamp(qStr, currentTime.UnixNano()) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal query=%q: %w", qStr, err) + } traceStartTime, err := findTraceIDTimeSplitTimeRange(ctx, q, cp) if err != nil { From 8747735f8b7e37e4d09b6d03c298f77fb75dc67e Mon Sep 17 00:00:00 2001 From: Jiekun Date: Tue, 15 Jul 2025 10:35:45 +0800 Subject: [PATCH 3/6] feature: [ingestion] Add dedup cache and index partition --- app/vtinsert/opentelemetry/opentelemetry.go | 17 +- app/vtselect/traces/query/query.go | 13 +- go.mod | 1 + go.sum | 4 + .../opentelemetry/pb/trace_fields.go | 6 +- .../VictoriaMetrics/fastcache/LICENSE | 22 + .../VictoriaMetrics/fastcache/README.md | 116 +++++ .../VictoriaMetrics/fastcache/bigcache.go | 160 +++++++ .../VictoriaMetrics/fastcache/fastcache.go | 433 ++++++++++++++++++ .../VictoriaMetrics/fastcache/file.go | 431 +++++++++++++++++ .../VictoriaMetrics/fastcache/malloc_heap.go | 12 + .../VictoriaMetrics/fastcache/malloc_mmap.go | 54 +++ vendor/modules.txt | 3 + 13 files changed, 1260 insertions(+), 12 deletions(-) create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/LICENSE create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/README.md create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/bigcache.go create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/fastcache.go create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/file.go create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/malloc_heap.go create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/malloc_mmap.go diff --git a/app/vtinsert/opentelemetry/opentelemetry.go b/app/vtinsert/opentelemetry/opentelemetry.go index 817d3427b..6648f3eaf 100644 --- a/app/vtinsert/opentelemetry/opentelemetry.go +++ b/app/vtinsert/opentelemetry/opentelemetry.go @@ -10,9 +10,12 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" + "github.com/VictoriaMetrics/fastcache" + "github.com/VictoriaMetrics/metrics" + "github.com/cespare/xxhash/v2" + "github.com/VictoriaMetrics/VictoriaTraces/app/vtinsert/insertutil" otelpb "github.com/VictoriaMetrics/VictoriaTraces/lib/protoparser/opentelemetry/pb" - "github.com/VictoriaMetrics/metrics" ) var maxRequestSize = flagutil.NewBytes("opentelemetry.traces.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single OpenTelemetry trace export request.") @@ -26,10 +29,15 @@ var ( var ( mandatoryStreamFields = []string{otelpb.ResourceAttrServiceName, otelpb.NameField} - traceIDIndexStreamFields = []logstorage.Field{{Name: otelpb.TraceIDIndexStreamName, Value: otelpb.TraceIDIndexStreamValue}} + traceIDIndexStreamFields = []logstorage.Field{{Name: otelpb.TraceIDIndexStreamName}} msgFieldValue = "-" ) +var ( + // traceIDCache for deduplicating trace_id + traceIDCache = fastcache.New(64 * 1024 * 1024) +) + // RequestHandler processes Opentelemetry insert requests func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { switch path { @@ -178,11 +186,12 @@ func pushFieldsFromSpan(span *otelpb.Span, scopeCommonFields []logstorage.Field, lmp.AddRow(int64(span.EndTimeUnixNano), fields, nil) // for root span, create an entity in trace-id-idx stream. - if span.ParentSpanID == "" { + if !traceIDCache.Has([]byte(span.TraceID)) { lmp.AddRow(int64(span.StartTimeUnixNano), []logstorage.Field{ {Name: otelpb.TraceIDIndexFieldName, Value: span.TraceID}, {Name: "_msg", Value: msgFieldValue}, - }, traceIDIndexStreamFields) + }, []logstorage.Field{{Name: otelpb.TraceIDIndexStreamName, Value: strconv.FormatUint(xxhash.Sum64String(span.TraceID)%otelpb.TraceIDIndexPartitionCount, 10)}}) + traceIDCache.Set([]byte(span.TraceID), nil) } return fields } diff --git a/app/vtselect/traces/query/query.go b/app/vtselect/traces/query/query.go index 77acf326e..adbfad1cf 100644 --- a/app/vtselect/traces/query/query.go +++ b/app/vtselect/traces/query/query.go @@ -13,13 +13,14 @@ import ( "github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/cespare/xxhash/v2" "github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage" otelpb "github.com/VictoriaMetrics/VictoriaTraces/lib/protoparser/opentelemetry/pb" ) var ( - traceMaxDurationWindow = flag.Duration("search.traceMaxDurationWindow", 10*time.Minute, "The window of searching for the rest trace spans after finding one span."+ + traceMaxDurationWindow = flag.Duration("search.traceMaxDurationWindow", 45*time.Second, "The window of searching for the rest trace spans after finding one span."+ "It allows extending the search start time and end time by -search.traceMaxDurationWindow to make sure all spans are included."+ "It affects both Jaeger's /api/traces and /api/traces/ APIs.") traceServiceAndSpanNameLookbehind = flag.Duration("search.traceServiceAndSpanNameLookbehind", 7*24*time.Hour, "The time range of searching for service name and span name. "+ @@ -138,21 +139,23 @@ func GetSpanNameList(ctx context.Context, cp *CommonParams, serviceName string) func GetTrace(ctx context.Context, cp *CommonParams, traceID string) ([]*Row, error) { currentTime := time.Now() + // possible partition + // query: {trace_id_idx="-"} AND trace_id:traceID - qStr := fmt.Sprintf(`{%s="%s"} AND %s:=%q | fields _time`, otelpb.TraceIDIndexStreamName, otelpb.TraceIDIndexStreamValue, otelpb.TraceIDIndexFieldName, traceID) + qStr := fmt.Sprintf(`{%s="%d"} AND %s:=%q | fields _time`, otelpb.TraceIDIndexStreamName, xxhash.Sum64String(traceID)%otelpb.TraceIDIndexPartitionCount, otelpb.TraceIDIndexFieldName, traceID) q, err := logstorage.ParseQueryAtTimestamp(qStr, currentTime.UnixNano()) if err != nil { return nil, fmt.Errorf("cannot unmarshal query=%q: %w", qStr, err) } - traceStartTime, err := findTraceIDTimeSplitTimeRange(ctx, q, cp) + traceTimestamp, err := findTraceIDTimeSplitTimeRange(ctx, q, cp) if err != nil { return nil, fmt.Errorf("cannot find trace_id %q start time: %s", traceID, err) } // fast path: trace start time found, search in [trace start time, trace start time + *traceMaxDurationWindow] time range. - if !traceStartTime.IsZero() { - return findSpansByTraceIDAndTime(ctx, cp, traceID, traceStartTime, traceStartTime.Add(*traceMaxDurationWindow)) + if !traceTimestamp.IsZero() { + return findSpansByTraceIDAndTime(ctx, cp, traceID, traceTimestamp.Add(-*traceMaxDurationWindow), traceTimestamp.Add(*traceMaxDurationWindow)) } // slow path: if trace start time not exist, probably the root span was not available. // try to search from now to 0 timestamp. diff --git a/go.mod b/go.mod index 8c8ceb22d..fa339f97b 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/VictoriaMetrics/VictoriaLogs v1.25.1 github.com/VictoriaMetrics/VictoriaMetrics v0.0.0-20250714222639-15242a70a79f github.com/VictoriaMetrics/easyproto v0.1.4 + github.com/VictoriaMetrics/fastcache v1.12.5 github.com/VictoriaMetrics/metrics v1.38.0 github.com/cespare/xxhash/v2 v2.3.0 github.com/google/go-cmp v0.7.0 diff --git a/go.sum b/go.sum index 7abe13ecb..85c7be3d4 100644 --- a/go.sum +++ b/go.sum @@ -4,10 +4,14 @@ github.com/VictoriaMetrics/VictoriaMetrics v0.0.0-20250714222639-15242a70a79f h1 github.com/VictoriaMetrics/VictoriaMetrics v0.0.0-20250714222639-15242a70a79f/go.mod h1:sifySYoFINFigscdMlbdMh3LVi7ktP28DAT92f0SsQA= github.com/VictoriaMetrics/easyproto v0.1.4 h1:r8cNvo8o6sR4QShBXQd1bKw/VVLSQma/V2KhTBPf+Sc= github.com/VictoriaMetrics/easyproto v0.1.4/go.mod h1:QlGlzaJnDfFd8Lk6Ci/fuLxfTo3/GThPs2KH23mv710= +github.com/VictoriaMetrics/fastcache v1.12.5 h1:966OX9JjqYmDAFdp3wEXLwzukiHIm+GVlZHv6B8KW3k= +github.com/VictoriaMetrics/fastcache v1.12.5/go.mod h1:K+JGPBn0sueFlLjZ8rcVM0cKkWKNElKyQXmw57QOoYI= github.com/VictoriaMetrics/metrics v1.38.0 h1:1d0dRgVH8Nnu8dKMfisKefPC3q7gqf3/odyO0quAvyA= github.com/VictoriaMetrics/metrics v1.38.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8= github.com/VictoriaMetrics/metricsql v0.84.6 h1:r1rl05prim/r+Me4BUULaZQYXn2eZa3dnrtk+hY3X90= github.com/VictoriaMetrics/metricsql v0.84.6/go.mod h1:d4EisFO6ONP/HIGDYTAtwrejJBBeKGQYiRl095bS4QQ= +github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= +github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= diff --git a/lib/protoparser/opentelemetry/pb/trace_fields.go b/lib/protoparser/opentelemetry/pb/trace_fields.go index 0e1a080d4..9106607e1 100644 --- a/lib/protoparser/opentelemetry/pb/trace_fields.go +++ b/lib/protoparser/opentelemetry/pb/trace_fields.go @@ -4,9 +4,9 @@ package pb // Special: TraceID index stream and fields const ( - TraceIDIndexStreamName = "trace_id_idx_stream" - TraceIDIndexStreamValue = "-" - TraceIDIndexFieldName = "trace_id_idx" + TraceIDIndexStreamName = "trace_id_idx_stream" + TraceIDIndexFieldName = "trace_id_idx" + TraceIDIndexPartitionCount = uint64(1024) ) // Resource diff --git a/vendor/github.com/VictoriaMetrics/fastcache/LICENSE b/vendor/github.com/VictoriaMetrics/fastcache/LICENSE new file mode 100644 index 000000000..9a8145e58 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2018 VictoriaMetrics + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/github.com/VictoriaMetrics/fastcache/README.md b/vendor/github.com/VictoriaMetrics/fastcache/README.md new file mode 100644 index 000000000..b353214af --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/README.md @@ -0,0 +1,116 @@ +[![Build Status](https://github.com/VictoriaMetrics/fastcache/workflows/main/badge.svg)](https://github.com/VictoriaMetrics/fastcache/actions) +[![GoDoc](https://godoc.org/github.com/VictoriaMetrics/fastcache?status.svg)](http://godoc.org/github.com/VictoriaMetrics/fastcache) +[![Go Report](https://goreportcard.com/badge/github.com/VictoriaMetrics/fastcache)](https://goreportcard.com/report/github.com/VictoriaMetrics/fastcache) +[![codecov](https://codecov.io/gh/VictoriaMetrics/fastcache/branch/master/graph/badge.svg)](https://codecov.io/gh/VictoriaMetrics/fastcache) + +# fastcache - fast thread-safe inmemory cache for big number of entries in Go + +### Features + +* Fast. Performance scales on multi-core CPUs. See benchmark results below. +* Thread-safe. Concurrent goroutines may read and write into a single + cache instance. +* The fastcache is designed for storing big number of entries without + [GC overhead](https://syslog.ravelin.com/further-dangers-of-large-heaps-in-go-7a267b57d487). +* Fastcache automatically evicts old entries when reaching the maximum cache size + set on its creation. +* [Simple API](http://godoc.org/github.com/VictoriaMetrics/fastcache). +* Simple source code. +* Cache may be [saved to file](https://godoc.org/github.com/VictoriaMetrics/fastcache#Cache.SaveToFile) + and [loaded from file](https://godoc.org/github.com/VictoriaMetrics/fastcache#LoadFromFile). +* Works on [Google AppEngine](https://cloud.google.com/appengine/docs/go/). + + +### Benchmarks + +`Fastcache` performance is compared with [BigCache](https://github.com/allegro/bigcache), standard Go map +and [sync.Map](https://golang.org/pkg/sync/#Map). + +``` +GOMAXPROCS=4 go test github.com/VictoriaMetrics/fastcache -bench='Set|Get' -benchtime=10s +goos: linux +goarch: amd64 +pkg: github.com/VictoriaMetrics/fastcache +BenchmarkBigCacheSet-4 2000 10566656 ns/op 6.20 MB/s 4660369 B/op 6 allocs/op +BenchmarkBigCacheGet-4 2000 6902694 ns/op 9.49 MB/s 684169 B/op 131076 allocs/op +BenchmarkBigCacheSetGet-4 1000 17579118 ns/op 7.46 MB/s 5046744 B/op 131083 allocs/op +BenchmarkCacheSet-4 5000 3808874 ns/op 17.21 MB/s 1142 B/op 2 allocs/op +BenchmarkCacheGet-4 5000 3293849 ns/op 19.90 MB/s 1140 B/op 2 allocs/op +BenchmarkCacheSetGet-4 2000 8456061 ns/op 15.50 MB/s 2857 B/op 5 allocs/op +BenchmarkStdMapSet-4 2000 10559382 ns/op 6.21 MB/s 268413 B/op 65537 allocs/op +BenchmarkStdMapGet-4 5000 2687404 ns/op 24.39 MB/s 2558 B/op 13 allocs/op +BenchmarkStdMapSetGet-4 100 154641257 ns/op 0.85 MB/s 387405 B/op 65558 allocs/op +BenchmarkSyncMapSet-4 500 24703219 ns/op 2.65 MB/s 3426543 B/op 262411 allocs/op +BenchmarkSyncMapGet-4 5000 2265892 ns/op 28.92 MB/s 2545 B/op 79 allocs/op +BenchmarkSyncMapSetGet-4 1000 14595535 ns/op 8.98 MB/s 3417190 B/op 262277 allocs/op +``` + +`MB/s` column here actually means `millions of operations per second`. +As you can see, `fastcache` is faster than the `BigCache` in all the cases. +`fastcache` is faster than the standard Go map and `sync.Map` on workloads +with inserts. + + +### Limitations + +* Keys and values must be byte slices. Other types must be marshaled before + storing them in the cache. +* Big entries with sizes exceeding 64KB must be stored via [distinct API](http://godoc.org/github.com/VictoriaMetrics/fastcache#Cache.SetBig). +* There is no cache expiration. Entries are evicted from the cache only + on cache size overflow. Entry deadline may be stored inside the value in order + to implement cache expiration. + + +### Architecture details + +The cache uses ideas from [BigCache](https://github.com/allegro/bigcache): + +* The cache consists of many buckets, each with its own lock. + This helps scaling the performance on multi-core CPUs, since multiple + CPUs may concurrently access distinct buckets. +* Each bucket consists of a `hash(key) -> (key, value) position` map + and 64KB-sized byte slices (chunks) holding encoded `(key, value)` entries. + Each bucket contains only `O(chunksCount)` pointers. For instance, 64GB cache + would contain ~1M pointers, while similarly-sized `map[string][]byte` + would contain ~1B pointers for short keys and values. This would lead to + [huge GC overhead](https://syslog.ravelin.com/further-dangers-of-large-heaps-in-go-7a267b57d487). + +64KB-sized chunks reduce memory fragmentation and the total memory usage comparing +to a single big chunk per bucket. +Chunks are allocated off-heap if possible. This reduces total memory usage because +GC collects unused memory more frequently without the need in `GOGC` tweaking. + + +### Users + +* `Fastcache` has been extracted from [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics) sources. + See [this article](https://medium.com/devopslinks/victoriametrics-creating-the-best-remote-storage-for-prometheus-5d92d66787ac) + for more info about `VictoriaMetrics`. + + +### FAQ + +#### What is the difference between `fastcache` and other similar caches like [BigCache](https://github.com/allegro/bigcache) or [FreeCache](https://github.com/coocood/freecache)? + +* `Fastcache` is faster. See benchmark results above. +* `Fastcache` uses less memory due to lower heap fragmentation. This allows + saving many GBs of memory on multi-GB caches. +* `Fastcache` API [is simpler](http://godoc.org/github.com/VictoriaMetrics/fastcache). + The API is designed to be used in zero-allocation mode. + + +#### Why `fastcache` doesn't support cache expiration? + +Because we don't need cache expiration in [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics). +Cached entries inside `VictoriaMetrics` never expire. They are automatically evicted on cache size overflow. + +It is easy to implement cache expiration on top of `fastcache` by caching values +with marshaled deadlines and verifying deadlines after reading these values +from the cache. + + +#### Why `fastcache` doesn't support advanced features such as [thundering herd protection](https://en.wikipedia.org/wiki/Thundering_herd_problem) or callbacks on entries' eviction? + +Because these features would complicate the code and would make it slower. +`Fastcache` source code is simple - just copy-paste it and implement the feature you want +on top of it. diff --git a/vendor/github.com/VictoriaMetrics/fastcache/bigcache.go b/vendor/github.com/VictoriaMetrics/fastcache/bigcache.go new file mode 100644 index 000000000..ea234b40d --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/bigcache.go @@ -0,0 +1,160 @@ +package fastcache + +import ( + "sync" + "sync/atomic" + + xxhash "github.com/cespare/xxhash/v2" +) + +// maxSubvalueLen is the maximum size of subvalue chunk. +// +// - 16 bytes are for subkey encoding +// - 4 bytes are for len(key)+len(value) encoding inside fastcache +// - 1 byte is implementation detail of fastcache +const maxSubvalueLen = chunkSize - 16 - 4 - 1 + +// maxKeyLen is the maximum size of key. +// +// - 16 bytes are for (hash + valueLen) +// - 4 bytes are for len(key)+len(subkey) +// - 1 byte is implementation detail of fastcache +const maxKeyLen = chunkSize - 16 - 4 - 1 + +// SetBig sets (k, v) to c where len(v) may exceed 64KB. +// +// GetBig must be used for reading stored values. +// +// The stored entry may be evicted at any time either due to cache +// overflow or due to unlikely hash collision. +// Pass higher maxBytes value to New if the added items disappear +// frequently. +// +// It is safe to store entries smaller than 64KB with SetBig. +// +// k and v contents may be modified after returning from SetBig. +func (c *Cache) SetBig(k, v []byte) { + atomic.AddUint64(&c.bigStats.SetBigCalls, 1) + if len(k) > maxKeyLen { + atomic.AddUint64(&c.bigStats.TooBigKeyErrors, 1) + return + } + valueLen := len(v) + valueHash := xxhash.Sum64(v) + + // Split v into chunks with up to 64Kb each. + subkey := getSubkeyBuf() + var i uint64 + for len(v) > 0 { + subkey.B = marshalUint64(subkey.B[:0], valueHash) + subkey.B = marshalUint64(subkey.B, uint64(i)) + i++ + subvalueLen := maxSubvalueLen + if len(v) < subvalueLen { + subvalueLen = len(v) + } + subvalue := v[:subvalueLen] + v = v[subvalueLen:] + c.Set(subkey.B, subvalue) + } + + // Write metavalue, which consists of valueHash and valueLen. + subkey.B = marshalUint64(subkey.B[:0], valueHash) + subkey.B = marshalUint64(subkey.B, uint64(valueLen)) + c.Set(k, subkey.B) + putSubkeyBuf(subkey) +} + +// GetBig searches for the value for the given k, appends it to dst +// and returns the result. +// +// GetBig returns only values stored via SetBig. It doesn't work +// with values stored via other methods. +// +// k contents may be modified after returning from GetBig. +func (c *Cache) GetBig(dst, k []byte) (r []byte) { + atomic.AddUint64(&c.bigStats.GetBigCalls, 1) + subkey := getSubkeyBuf() + dstWasNil := dst == nil + defer func() { + putSubkeyBuf(subkey) + if len(r) == 0 && dstWasNil { + // Guarantee that if the caller provided nil and this is a cache miss that + // the caller can accurately test for a cache miss with `if r == nil`. + r = nil + } + }() + + // Read and parse metavalue + subkey.B = c.Get(subkey.B[:0], k) + if len(subkey.B) == 0 { + // Nothing found. + return dst + } + if len(subkey.B) != 16 { + atomic.AddUint64(&c.bigStats.InvalidMetavalueErrors, 1) + return dst + } + valueHash := unmarshalUint64(subkey.B) + valueLen := unmarshalUint64(subkey.B[8:]) + + // Collect result from chunks. + dstLen := len(dst) + if n := dstLen + int(valueLen) - cap(dst); n > 0 { + dst = append(dst[:cap(dst)], make([]byte, n)...) + } + dst = dst[:dstLen] + var i uint64 + for uint64(len(dst)-dstLen) < valueLen { + subkey.B = marshalUint64(subkey.B[:0], valueHash) + subkey.B = marshalUint64(subkey.B, uint64(i)) + i++ + dstNew := c.Get(dst, subkey.B) + if len(dstNew) == len(dst) { + // Cannot find subvalue + return dst[:dstLen] + } + dst = dstNew + } + + // Verify the obtained value. + v := dst[dstLen:] + if uint64(len(v)) != valueLen { + atomic.AddUint64(&c.bigStats.InvalidValueLenErrors, 1) + return dst[:dstLen] + } + h := xxhash.Sum64(v) + if h != valueHash { + atomic.AddUint64(&c.bigStats.InvalidValueHashErrors, 1) + return dst[:dstLen] + } + return dst +} + +func getSubkeyBuf() *bytesBuf { + v := subkeyPool.Get() + if v == nil { + return &bytesBuf{} + } + return v.(*bytesBuf) +} + +func putSubkeyBuf(bb *bytesBuf) { + bb.B = bb.B[:0] + subkeyPool.Put(bb) +} + +var subkeyPool sync.Pool + +type bytesBuf struct { + B []byte +} + +func marshalUint64(dst []byte, u uint64) []byte { + return append(dst, byte(u>>56), byte(u>>48), byte(u>>40), byte(u>>32), byte(u>>24), byte(u>>16), byte(u>>8), byte(u)) +} + +func unmarshalUint64(src []byte) uint64 { + _ = src[7] + return uint64(src[0])<<56 | uint64(src[1])<<48 | uint64(src[2])<<40 | uint64(src[3])<<32 | uint64(src[4])<<24 | uint64(src[5])<<16 | uint64(src[6])<<8 | uint64(src[7]) +} diff --git a/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go b/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go new file mode 100644 index 000000000..046af6ceb --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go @@ -0,0 +1,433 @@ +// Package fastcache implements fast in-memory cache. +// +// The package has been extracted from https://victoriametrics.com/ +package fastcache + +import ( + "fmt" + "sync" + "sync/atomic" + + xxhash "github.com/cespare/xxhash/v2" +) + +const bucketsCount = 512 + +const chunkSize = 64 * 1024 + +const bucketSizeBits = 40 + +const genSizeBits = 64 - bucketSizeBits + +const maxGen = 1<= maxBucketSize { + panic(fmt.Errorf("too big maxBytes=%d; should be smaller than %d", maxBytes, maxBucketSize)) + } + maxChunks := (maxBytes + chunkSize - 1) / chunkSize + b.chunks = make([][]byte, maxChunks) + b.m = make(map[uint64]uint64) + b.Reset() +} + +func (b *bucket) Reset() { + b.mu.Lock() + chunks := b.chunks + for i := range chunks { + putChunk(chunks[i]) + chunks[i] = nil + } + b.m = make(map[uint64]uint64) + b.idx = 0 + b.gen = 1 + atomic.StoreUint64(&b.getCalls, 0) + atomic.StoreUint64(&b.setCalls, 0) + atomic.StoreUint64(&b.misses, 0) + atomic.StoreUint64(&b.collisions, 0) + atomic.StoreUint64(&b.corruptions, 0) + b.mu.Unlock() +} + +func (b *bucket) cleanLocked() { + bGen := b.gen & ((1 << genSizeBits) - 1) + bIdx := b.idx + bm := b.m + newItems := 0 + for _, v := range bm { + gen := v >> bucketSizeBits + idx := v & ((1 << bucketSizeBits) - 1) + if (gen+1 == bGen || gen == maxGen && bGen == 1) && idx >= bIdx || gen == bGen && idx < bIdx { + newItems++ + } + } + if newItems < len(bm) { + // Re-create b.m with valid items, which weren't expired yet instead of deleting expired items from b.m. + // This should reduce memory fragmentation and the number Go objects behind b.m. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5379 + bmNew := make(map[uint64]uint64, newItems) + for k, v := range bm { + gen := v >> bucketSizeBits + idx := v & ((1 << bucketSizeBits) - 1) + if (gen+1 == bGen || gen == maxGen && bGen == 1) && idx >= bIdx || gen == bGen && idx < bIdx { + bmNew[k] = v + } + } + b.m = bmNew + } +} + +func (b *bucket) UpdateStats(s *Stats) { + s.GetCalls += atomic.LoadUint64(&b.getCalls) + s.SetCalls += atomic.LoadUint64(&b.setCalls) + s.Misses += atomic.LoadUint64(&b.misses) + s.Collisions += atomic.LoadUint64(&b.collisions) + s.Corruptions += atomic.LoadUint64(&b.corruptions) + + b.mu.RLock() + s.EntriesCount += uint64(len(b.m)) + bytesSize := uint64(0) + for _, chunk := range b.chunks { + bytesSize += uint64(cap(chunk)) + } + s.BytesSize += bytesSize + s.MaxBytesSize += uint64(len(b.chunks)) * chunkSize + b.mu.RUnlock() +} + +func (b *bucket) Set(k, v []byte, h uint64) { + atomic.AddUint64(&b.setCalls, 1) + if len(k) >= (1<<16) || len(v) >= (1<<16) { + // Too big key or value - its length cannot be encoded + // with 2 bytes (see below). Skip the entry. + return + } + var kvLenBuf [4]byte + kvLenBuf[0] = byte(uint16(len(k)) >> 8) + kvLenBuf[1] = byte(len(k)) + kvLenBuf[2] = byte(uint16(len(v)) >> 8) + kvLenBuf[3] = byte(len(v)) + kvLen := uint64(len(kvLenBuf) + len(k) + len(v)) + if kvLen >= chunkSize { + // Do not store too big keys and values, since they do not + // fit a chunk. + return + } + + chunks := b.chunks + needClean := false + b.mu.Lock() + idx := b.idx + idxNew := idx + kvLen + chunkIdx := idx / chunkSize + chunkIdxNew := idxNew / chunkSize + if chunkIdxNew > chunkIdx { + if chunkIdxNew >= uint64(len(chunks)) { + idx = 0 + idxNew = kvLen + chunkIdx = 0 + b.gen++ + if b.gen&((1< 0 { + gen := v >> bucketSizeBits + idx := v & ((1 << bucketSizeBits) - 1) + if gen == bGen && idx < b.idx || gen+1 == bGen && idx >= b.idx || gen == maxGen && bGen == 1 && idx >= b.idx { + chunkIdx := idx / chunkSize + if chunkIdx >= uint64(len(chunks)) { + // Corrupted data during the load from file. Just skip it. + atomic.AddUint64(&b.corruptions, 1) + goto end + } + chunk := chunks[chunkIdx] + idx %= chunkSize + if idx+4 >= chunkSize { + // Corrupted data during the load from file. Just skip it. + atomic.AddUint64(&b.corruptions, 1) + goto end + } + kvLenBuf := chunk[idx : idx+4] + keyLen := (uint64(kvLenBuf[0]) << 8) | uint64(kvLenBuf[1]) + valLen := (uint64(kvLenBuf[2]) << 8) | uint64(kvLenBuf[3]) + idx += 4 + if idx+keyLen+valLen >= chunkSize { + // Corrupted data during the load from file. Just skip it. + atomic.AddUint64(&b.corruptions, 1) + goto end + } + if string(k) == string(chunk[idx:idx+keyLen]) { + idx += keyLen + if returnDst { + dst = append(dst, chunk[idx:idx+valLen]...) + } + found = true + } else { + atomic.AddUint64(&b.collisions, 1) + } + } + } +end: + b.mu.RUnlock() + if !found { + atomic.AddUint64(&b.misses, 1) + } + return dst, found +} + +func (b *bucket) Del(h uint64) { + b.mu.Lock() + delete(b.m, h) + b.mu.Unlock() +} diff --git a/vendor/github.com/VictoriaMetrics/fastcache/file.go b/vendor/github.com/VictoriaMetrics/fastcache/file.go new file mode 100644 index 000000000..f47032772 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/file.go @@ -0,0 +1,431 @@ +package fastcache + +import ( + "encoding/binary" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "regexp" + "runtime" + + "github.com/golang/snappy" +) + +// SaveToFile atomically saves cache data to the given filePath using a single +// CPU core. +// +// SaveToFile may be called concurrently with other operations on the cache. +// +// The saved data may be loaded with LoadFromFile*. +// +// See also SaveToFileConcurrent for faster saving to file. +func (c *Cache) SaveToFile(filePath string) error { + return c.SaveToFileConcurrent(filePath, 1) +} + +// SaveToFileConcurrent saves cache data to the given filePath using concurrency +// CPU cores. +// +// SaveToFileConcurrent may be called concurrently with other operations +// on the cache. +// +// The saved data may be loaded with LoadFromFile*. +// +// See also SaveToFile. +func (c *Cache) SaveToFileConcurrent(filePath string, concurrency int) error { + // Create dir if it doesn't exist. + dir := filepath.Dir(filePath) + if _, err := os.Stat(dir); err != nil { + if !os.IsNotExist(err) { + return fmt.Errorf("cannot stat %q: %s", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("cannot create dir %q: %s", dir, err) + } + } + + // Save cache data into a temporary directory. + tmpDir, err := ioutil.TempDir(dir, "fastcache.tmp.") + if err != nil { + return fmt.Errorf("cannot create temporary dir inside %q: %s", dir, err) + } + defer func() { + if tmpDir != "" { + _ = os.RemoveAll(tmpDir) + } + }() + gomaxprocs := runtime.GOMAXPROCS(-1) + if concurrency <= 0 || concurrency > gomaxprocs { + concurrency = gomaxprocs + } + if err := c.save(tmpDir, concurrency); err != nil { + return fmt.Errorf("cannot save cache data to temporary dir %q: %s", tmpDir, err) + } + + // Remove old filePath contents, since os.Rename may return + // error if filePath dir exists. + if err := os.RemoveAll(filePath); err != nil { + return fmt.Errorf("cannot remove old contents at %q: %s", filePath, err) + } + if err := os.Rename(tmpDir, filePath); err != nil { + return fmt.Errorf("cannot move temporary dir %q to %q: %s", tmpDir, filePath, err) + } + tmpDir = "" + return nil +} + +// LoadFromFileMaxBytes loads cache data from the specified filePath, +// enforcing that the cache capacity matches the provided maxBytes value. +// +// Returns an error if the stored cache's capacity differs from maxBytes. +// +// See SaveToFile* for functions that persist cache data to a file. +func LoadFromFileMaxBytes(filePath string, maxBytes int) (*Cache, error) { + return load(filePath, maxBytes) +} + +// LoadFromFile loads cache data from the given filePath. +// +// See SaveToFile* for saving cache data to file. +func LoadFromFile(filePath string) (*Cache, error) { + return load(filePath, 0) +} + +// LoadFromFileOrNew tries loading cache data from the given filePath. +// +// The function falls back to creating new cache with the given maxBytes +// capacity if error occurs during loading the cache from file. +func LoadFromFileOrNew(filePath string, maxBytes int) *Cache { + c, err := load(filePath, maxBytes) + if err == nil { + return c + } + return New(maxBytes) +} + +func (c *Cache) save(dir string, workersCount int) error { + if err := saveMetadata(c, dir); err != nil { + return err + } + + // Save buckets by workersCount concurrent workers. + workCh := make(chan int, workersCount) + results := make(chan error) + for i := 0; i < workersCount; i++ { + go func(workerNum int) { + results <- saveBuckets(c.buckets[:], workCh, dir, workerNum) + }(i) + } + // Feed workers with work + for i := range c.buckets[:] { + workCh <- i + } + close(workCh) + + // Read results. + var err error + for i := 0; i < workersCount; i++ { + result := <-results + if result != nil && err == nil { + err = result + } + } + return err +} + +func load(filePath string, maxBytes int) (*Cache, error) { + maxBucketChunks, err := loadMetadata(filePath) + if err != nil { + return nil, err + } + if maxBytes > 0 { + maxBucketBytes := uint64((maxBytes + bucketsCount - 1) / bucketsCount) + expectedBucketChunks := (maxBucketBytes + chunkSize - 1) / chunkSize + if maxBucketChunks != expectedBucketChunks { + return nil, fmt.Errorf("cache file %s contains maxBytes=%d; want %d", filePath, maxBytes, expectedBucketChunks*chunkSize*bucketsCount) + } + } + + // Read bucket files from filePath dir. + d, err := os.Open(filePath) + if err != nil { + return nil, fmt.Errorf("cannot open %q: %w", filePath, err) + } + defer func() { + _ = d.Close() + }() + fis, err := d.Readdir(-1) + if err != nil { + return nil, fmt.Errorf("cannot read files from %q: %s", filePath, err) + } + results := make(chan error) + workersCount := 0 + var c Cache + for _, fi := range fis { + fn := fi.Name() + if fi.IsDir() || !dataFileRegexp.MatchString(fn) { + continue + } + workersCount++ + go func(dataPath string) { + results <- loadBuckets(c.buckets[:], dataPath, maxBucketChunks) + }(filePath + "/" + fn) + } + err = nil + for i := 0; i < workersCount; i++ { + result := <-results + if result != nil && err == nil { + err = result + } + } + if err != nil { + return nil, err + } + // Initialize buckets, which could be missing due to incomplete or corrupted files in the cache. + // It is better initializing such buckets instead of returning error, since the rest of buckets + // contain valid data. + for i := range c.buckets[:] { + b := &c.buckets[i] + if len(b.chunks) == 0 { + b.chunks = make([][]byte, maxBucketChunks) + b.m = make(map[uint64]uint64) + } + } + return &c, nil +} + +func saveMetadata(c *Cache, dir string) error { + metadataPath := dir + "/metadata.bin" + metadataFile, err := os.Create(metadataPath) + if err != nil { + return fmt.Errorf("cannot create %q: %s", metadataPath, err) + } + defer func() { + _ = metadataFile.Close() + }() + maxBucketChunks := uint64(cap(c.buckets[0].chunks)) + if err := writeUint64(metadataFile, maxBucketChunks); err != nil { + return fmt.Errorf("cannot write maxBucketChunks=%d to %q: %s", maxBucketChunks, metadataPath, err) + } + return nil +} + +func loadMetadata(dir string) (uint64, error) { + metadataPath := dir + "/metadata.bin" + metadataFile, err := os.Open(metadataPath) + if err != nil { + return 0, fmt.Errorf("cannot open %q: %w", metadataPath, err) + } + defer func() { + _ = metadataFile.Close() + }() + maxBucketChunks, err := readUint64(metadataFile) + if err != nil { + return 0, fmt.Errorf("cannot read maxBucketChunks from %q: %s", metadataPath, err) + } + if maxBucketChunks == 0 { + return 0, fmt.Errorf("invalid maxBucketChunks=0 read from %q", metadataPath) + } + return maxBucketChunks, nil +} + +var dataFileRegexp = regexp.MustCompile(`^data\.\d+\.bin$`) + +func saveBuckets(buckets []bucket, workCh <-chan int, dir string, workerNum int) error { + dataPath := fmt.Sprintf("%s/data.%d.bin", dir, workerNum) + dataFile, err := os.Create(dataPath) + if err != nil { + return fmt.Errorf("cannot create %q: %s", dataPath, err) + } + defer func() { + _ = dataFile.Close() + }() + zw := snappy.NewBufferedWriter(dataFile) + for bucketNum := range workCh { + if err := writeUint64(zw, uint64(bucketNum)); err != nil { + return fmt.Errorf("cannot write bucketNum=%d to %q: %s", bucketNum, dataPath, err) + } + if err := buckets[bucketNum].Save(zw); err != nil { + return fmt.Errorf("cannot save bucket[%d] to %q: %s", bucketNum, dataPath, err) + } + } + if err := zw.Close(); err != nil { + return fmt.Errorf("cannot close snappy.Writer for %q: %s", dataPath, err) + } + return nil +} + +func loadBuckets(buckets []bucket, dataPath string, maxChunks uint64) error { + dataFile, err := os.Open(dataPath) + if err != nil { + return fmt.Errorf("cannot open %q: %s", dataPath, err) + } + defer func() { + _ = dataFile.Close() + }() + zr := snappy.NewReader(dataFile) + for { + bucketNum, err := readUint64(zr) + if err == io.EOF { + // Reached the end of file. + return nil + } + if bucketNum >= uint64(len(buckets)) { + return fmt.Errorf("unexpected bucketNum read from %q: %d; must be smaller than %d", dataPath, bucketNum, len(buckets)) + } + if err := buckets[bucketNum].Load(zr, maxChunks); err != nil { + return fmt.Errorf("cannot load bucket[%d] from %q: %s", bucketNum, dataPath, err) + } + } +} + +func (b *bucket) Save(w io.Writer) error { + b.mu.Lock() + b.cleanLocked() + b.mu.Unlock() + + b.mu.RLock() + defer b.mu.RUnlock() + + // Store b.idx, b.gen and b.m to w. + + bIdx := b.idx + bGen := b.gen + chunksLen := 0 + for _, chunk := range b.chunks { + if chunk == nil { + break + } + chunksLen++ + } + kvs := make([]byte, 0, 2*8*len(b.m)) + var u64Buf [8]byte + for k, v := range b.m { + binary.LittleEndian.PutUint64(u64Buf[:], k) + kvs = append(kvs, u64Buf[:]...) + binary.LittleEndian.PutUint64(u64Buf[:], v) + kvs = append(kvs, u64Buf[:]...) + } + + if err := writeUint64(w, bIdx); err != nil { + return fmt.Errorf("cannot write b.idx: %s", err) + } + if err := writeUint64(w, bGen); err != nil { + return fmt.Errorf("cannot write b.gen: %s", err) + } + if err := writeUint64(w, uint64(len(kvs))/2/8); err != nil { + return fmt.Errorf("cannot write len(b.m): %s", err) + } + if _, err := w.Write(kvs); err != nil { + return fmt.Errorf("cannot write b.m: %s", err) + } + + // Store b.chunks to w. + if err := writeUint64(w, uint64(chunksLen)); err != nil { + return fmt.Errorf("cannot write len(b.chunks): %s", err) + } + for chunkIdx := 0; chunkIdx < chunksLen; chunkIdx++ { + chunk := b.chunks[chunkIdx][:chunkSize] + if _, err := w.Write(chunk); err != nil { + return fmt.Errorf("cannot write b.chunks[%d]: %s", chunkIdx, err) + } + } + + return nil +} + +func (b *bucket) Load(r io.Reader, maxChunks uint64) error { + if maxChunks == 0 { + return fmt.Errorf("the number of chunks per bucket cannot be zero") + } + bIdx, err := readUint64(r) + if err != nil { + return fmt.Errorf("cannot read b.idx: %s", err) + } + bGen, err := readUint64(r) + if err != nil { + return fmt.Errorf("cannot read b.gen: %s", err) + } + kvsLen, err := readUint64(r) + if err != nil { + return fmt.Errorf("cannot read len(b.m): %s", err) + } + kvsLen *= 2 * 8 + kvs := make([]byte, kvsLen) + if _, err := io.ReadFull(r, kvs); err != nil { + return fmt.Errorf("cannot read b.m: %s", err) + } + m := make(map[uint64]uint64, kvsLen/2/8) + for len(kvs) > 0 { + k := binary.LittleEndian.Uint64(kvs) + kvs = kvs[8:] + v := binary.LittleEndian.Uint64(kvs) + kvs = kvs[8:] + m[k] = v + } + + maxBytes := maxChunks * chunkSize + if maxBytes >= maxBucketSize { + return fmt.Errorf("too big maxBytes=%d; should be smaller than %d", maxBytes, maxBucketSize) + } + chunks := make([][]byte, maxChunks) + chunksLen, err := readUint64(r) + if err != nil { + return fmt.Errorf("cannot read len(b.chunks): %s", err) + } + if chunksLen > uint64(maxChunks) { + return fmt.Errorf("chunksLen=%d cannot exceed maxChunks=%d", chunksLen, maxChunks) + } + currChunkIdx := bIdx / chunkSize + if currChunkIdx > 0 && currChunkIdx >= chunksLen { + return fmt.Errorf("too big bIdx=%d; should be smaller than %d", bIdx, chunksLen*chunkSize) + } + for chunkIdx := uint64(0); chunkIdx < chunksLen; chunkIdx++ { + chunk := getChunk() + chunks[chunkIdx] = chunk + if _, err := io.ReadFull(r, chunk); err != nil { + // Free up allocated chunks before returning the error. + for _, chunk := range chunks { + if chunk != nil { + putChunk(chunk) + } + } + return fmt.Errorf("cannot read b.chunks[%d]: %s", chunkIdx, err) + } + } + // Adjust len for the chunk pointed by currChunkIdx. + if chunksLen > 0 { + chunkLen := bIdx % chunkSize + chunks[currChunkIdx] = chunks[currChunkIdx][:chunkLen] + } + + b.mu.Lock() + for _, chunk := range b.chunks { + putChunk(chunk) + } + b.chunks = chunks + b.m = m + b.idx = bIdx + b.gen = bGen + b.mu.Unlock() + + return nil +} + +func writeUint64(w io.Writer, u uint64) error { + var u64Buf [8]byte + binary.LittleEndian.PutUint64(u64Buf[:], u) + _, err := w.Write(u64Buf[:]) + return err +} + +func readUint64(r io.Reader) (uint64, error) { + var u64Buf [8]byte + if _, err := io.ReadFull(r, u64Buf[:]); err != nil { + return 0, err + } + u := binary.LittleEndian.Uint64(u64Buf[:]) + return u, nil +} diff --git a/vendor/github.com/VictoriaMetrics/fastcache/malloc_heap.go b/vendor/github.com/VictoriaMetrics/fastcache/malloc_heap.go new file mode 100644 index 000000000..810d460b7 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/malloc_heap.go @@ -0,0 +1,12 @@ +//go:build appengine || windows +// +build appengine windows + +package fastcache + +func getChunk() []byte { + return make([]byte, chunkSize) +} + +func putChunk(chunk []byte) { + // No-op. +} diff --git a/vendor/github.com/VictoriaMetrics/fastcache/malloc_mmap.go b/vendor/github.com/VictoriaMetrics/fastcache/malloc_mmap.go new file mode 100644 index 000000000..e24d578bf --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/malloc_mmap.go @@ -0,0 +1,54 @@ +//go:build !appengine && !windows +// +build !appengine,!windows + +package fastcache + +import ( + "fmt" + "sync" + "unsafe" + + "golang.org/x/sys/unix" +) + +const chunksPerAlloc = 1024 + +var ( + freeChunks []*[chunkSize]byte + freeChunksLock sync.Mutex +) + +func getChunk() []byte { + freeChunksLock.Lock() + if len(freeChunks) == 0 { + // Allocate offheap memory, so GOGC won't take into account cache size. + // This should reduce free memory waste. + data, err := unix.Mmap(-1, 0, chunkSize*chunksPerAlloc, unix.PROT_READ|unix.PROT_WRITE, unix.MAP_ANON|unix.MAP_PRIVATE) + if err != nil { + panic(fmt.Errorf("cannot allocate %d bytes via mmap: %s", chunkSize*chunksPerAlloc, err)) + } + for len(data) > 0 { + p := (*[chunkSize]byte)(unsafe.Pointer(&data[0])) + freeChunks = append(freeChunks, p) + data = data[chunkSize:] + } + } + n := len(freeChunks) - 1 + p := freeChunks[n] + freeChunks[n] = nil + freeChunks = freeChunks[:n] + freeChunksLock.Unlock() + return p[:] +} + +func putChunk(chunk []byte) { + if chunk == nil { + return + } + chunk = chunk[:chunkSize] + p := (*[chunkSize]byte)(unsafe.Pointer(&chunk[0])) + + freeChunksLock.Lock() + freeChunks = append(freeChunks, p) + freeChunksLock.Unlock() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index d8ced56db..2fa2131e7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -45,6 +45,9 @@ github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter # github.com/VictoriaMetrics/easyproto v0.1.4 ## explicit; go 1.18 github.com/VictoriaMetrics/easyproto +# github.com/VictoriaMetrics/fastcache v1.12.5 +## explicit; go 1.24.0 +github.com/VictoriaMetrics/fastcache # github.com/VictoriaMetrics/metrics v1.38.0 ## explicit; go 1.17 github.com/VictoriaMetrics/metrics From 3122d00c94a664861e4c978f6f1d8e44785e1f5e Mon Sep 17 00:00:00 2001 From: Jiekun Date: Thu, 17 Jul 2025 09:36:08 +0800 Subject: [PATCH 4/6] feature: [trace-id index] reduce trace id dedup cache to 32MiB --- app/vtinsert/opentelemetry/opentelemetry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/vtinsert/opentelemetry/opentelemetry.go b/app/vtinsert/opentelemetry/opentelemetry.go index 6648f3eaf..86b9328ef 100644 --- a/app/vtinsert/opentelemetry/opentelemetry.go +++ b/app/vtinsert/opentelemetry/opentelemetry.go @@ -35,7 +35,7 @@ var ( var ( // traceIDCache for deduplicating trace_id - traceIDCache = fastcache.New(64 * 1024 * 1024) + traceIDCache = fastcache.New(32 * 1024 * 1024) ) // RequestHandler processes Opentelemetry insert requests From 6d0cd6ea5aa0d0927b41a04f2c50990b82eaf093 Mon Sep 17 00:00:00 2001 From: Jiekun Date: Fri, 18 Jul 2025 22:23:18 +0800 Subject: [PATCH 5/6] feature: [trace id index] add debug log --- app/vtselect/traces/query/query.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/app/vtselect/traces/query/query.go b/app/vtselect/traces/query/query.go index adbfad1cf..d53938237 100644 --- a/app/vtselect/traces/query/query.go +++ b/app/vtselect/traces/query/query.go @@ -147,15 +147,19 @@ func GetTrace(ctx context.Context, cp *CommonParams, traceID string) ([]*Row, er if err != nil { return nil, fmt.Errorf("cannot unmarshal query=%q: %w", qStr, err) } - + startTime1 := time.Now() traceTimestamp, err := findTraceIDTimeSplitTimeRange(ctx, q, cp) if err != nil { return nil, fmt.Errorf("cannot find trace_id %q start time: %s", traceID, err) } + logger.Infof("find trace id time done: %.3fms", time.Since(startTime1).Seconds()) // fast path: trace start time found, search in [trace start time, trace start time + *traceMaxDurationWindow] time range. if !traceTimestamp.IsZero() { - return findSpansByTraceIDAndTime(ctx, cp, traceID, traceTimestamp.Add(-*traceMaxDurationWindow), traceTimestamp.Add(*traceMaxDurationWindow)) + startTime2 := time.Now() + result, err := findSpansByTraceIDAndTime(ctx, cp, traceID, traceTimestamp.Add(-*traceMaxDurationWindow), traceTimestamp.Add(*traceMaxDurationWindow)) + logger.Infof("find all spans done: %.3fms", time.Since(startTime2).Seconds()) + return result, err } // slow path: if trace start time not exist, probably the root span was not available. // try to search from now to 0 timestamp. From 8c5a54ead6c89db5c9b85bd35447c4e5a033211e Mon Sep 17 00:00:00 2001 From: Jiekun Date: Fri, 18 Jul 2025 22:44:29 +0800 Subject: [PATCH 6/6] feature: [trace id index] revert debug log --- app/vtselect/traces/query/query.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/app/vtselect/traces/query/query.go b/app/vtselect/traces/query/query.go index d53938237..c85b5f847 100644 --- a/app/vtselect/traces/query/query.go +++ b/app/vtselect/traces/query/query.go @@ -147,19 +147,14 @@ func GetTrace(ctx context.Context, cp *CommonParams, traceID string) ([]*Row, er if err != nil { return nil, fmt.Errorf("cannot unmarshal query=%q: %w", qStr, err) } - startTime1 := time.Now() traceTimestamp, err := findTraceIDTimeSplitTimeRange(ctx, q, cp) if err != nil { return nil, fmt.Errorf("cannot find trace_id %q start time: %s", traceID, err) } - logger.Infof("find trace id time done: %.3fms", time.Since(startTime1).Seconds()) // fast path: trace start time found, search in [trace start time, trace start time + *traceMaxDurationWindow] time range. if !traceTimestamp.IsZero() { - startTime2 := time.Now() - result, err := findSpansByTraceIDAndTime(ctx, cp, traceID, traceTimestamp.Add(-*traceMaxDurationWindow), traceTimestamp.Add(*traceMaxDurationWindow)) - logger.Infof("find all spans done: %.3fms", time.Since(startTime2).Seconds()) - return result, err + return findSpansByTraceIDAndTime(ctx, cp, traceID, traceTimestamp.Add(-*traceMaxDurationWindow), traceTimestamp.Add(*traceMaxDurationWindow)) } // slow path: if trace start time not exist, probably the root span was not available. // try to search from now to 0 timestamp.