Skip to content

feature: [trace_id idx] add trace_id index to accelerate query execution #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions app/vtinsert/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/fastcache"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"

"github.com/VictoriaMetrics/VictoriaTraces/app/vtinsert/insertutil"
otelpb "github.com/VictoriaMetrics/VictoriaTraces/lib/protoparser/opentelemetry/pb"
Expand All @@ -26,8 +28,14 @@
)

var (
mandatoryStreamFields = []string{otelpb.ResourceAttrServiceName, otelpb.NameField}
msgFieldValue = "-"
mandatoryStreamFields = []string{otelpb.ResourceAttrServiceName, otelpb.NameField}
traceIDIndexStreamFields = []logstorage.Field{{Name: otelpb.TraceIDIndexStreamName}}

Check failure on line 32 in app/vtinsert/opentelemetry/opentelemetry.go

View workflow job for this annotation

GitHub Actions / lint

var traceIDIndexStreamFields is unused (unused)
msgFieldValue = "-"
)

var (
// traceIDCache for deduplicating trace_id
traceIDCache = fastcache.New(32 * 1024 * 1024)
)

// RequestHandler processes Opentelemetry insert requests
Expand Down Expand Up @@ -176,6 +184,15 @@
Value: msgFieldValue,
})
lmp.AddRow(int64(span.EndTimeUnixNano), fields, nil)

// for root span, create an entity in trace-id-idx stream.
if !traceIDCache.Has([]byte(span.TraceID)) {
lmp.AddRow(int64(span.StartTimeUnixNano), []logstorage.Field{
{Name: otelpb.TraceIDIndexFieldName, Value: span.TraceID},
{Name: "_msg", Value: msgFieldValue},
}, []logstorage.Field{{Name: otelpb.TraceIDIndexStreamName, Value: strconv.FormatUint(xxhash.Sum64String(span.TraceID)%otelpb.TraceIDIndexPartitionCount, 10)}})
traceIDCache.Set([]byte(span.TraceID), nil)
}
return fields
}

Expand Down
251 changes: 173 additions & 78 deletions app/vtselect/traces/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import (
"time"

"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/cespare/xxhash/v2"

"github.com/VictoriaMetrics/VictoriaTraces/app/vtstorage"
otelpb "github.com/VictoriaMetrics/VictoriaTraces/lib/protoparser/opentelemetry/pb"
)

var (
traceMaxDurationWindow = flag.Duration("search.traceMaxDurationWindow", 10*time.Minute, "The window of searching for the rest trace spans after finding one span."+
traceMaxDurationWindow = flag.Duration("search.traceMaxDurationWindow", 45*time.Second, "The window of searching for the rest trace spans after finding one span."+
"It allows extending the search start time and end time by -search.traceMaxDurationWindow to make sure all spans are included."+
"It affects both Jaeger's /api/traces and /api/traces/<trace_id> APIs.")
traceServiceAndSpanNameLookbehind = flag.Duration("search.traceServiceAndSpanNameLookbehind", 7*24*time.Hour, "The time range of searching for service name and span name. "+
Expand Down Expand Up @@ -137,89 +139,26 @@ func GetSpanNameList(ctx context.Context, cp *CommonParams, serviceName string)
func GetTrace(ctx context.Context, cp *CommonParams, traceID string) ([]*Row, error) {
currentTime := time.Now()

// query: trace_id:traceID
qStr := fmt.Sprintf(otelpb.TraceIDField+": %q", traceID)
// possible partition

// query: {trace_id_idx="-"} AND trace_id:traceID
qStr := fmt.Sprintf(`{%s="%d"} AND %s:=%q | fields _time`, otelpb.TraceIDIndexStreamName, xxhash.Sum64String(traceID)%otelpb.TraceIDIndexPartitionCount, otelpb.TraceIDIndexFieldName, traceID)
q, err := logstorage.ParseQueryAtTimestamp(qStr, currentTime.UnixNano())
if err != nil {
return nil, fmt.Errorf("cannot parse query [%s]: %s", qStr, err)
return nil, fmt.Errorf("cannot unmarshal query=%q: %w", qStr, err)
}

ctxWithCancel, cancel := context.WithCancel(ctx)

// search for trace spans and write to `rows []*Row`
var rowsLock sync.Mutex
var rows []*Row
var missingTimeColumn atomic.Bool
writeBlock := func(_ uint, db *logstorage.DataBlock) {
if missingTimeColumn.Load() {
return
}

columns := db.Columns
clonedColumnNames := make([]string, len(columns))
for i, c := range columns {
clonedColumnNames[i] = strings.Clone(c.Name)
}

timestamps, ok := db.GetTimestamps(nil)
if !ok {
missingTimeColumn.Store(true)
cancel()
return
}

for i, timestamp := range timestamps {
fields := make([]logstorage.Field, 0, len(columns))
for j := range columns {
// column could be empty if this span does not contain such field.
// only append non-empty columns.
if columns[j].Values[i] != "" {
fields = append(fields, logstorage.Field{
Name: clonedColumnNames[j],
Value: strings.Clone(columns[j].Values[i]),
})
}
}

rowsLock.Lock()
rows = append(rows, &Row{
Timestamp: timestamp,
Fields: fields,
})
rowsLock.Unlock()
}
traceTimestamp, err := findTraceIDTimeSplitTimeRange(ctx, q, cp)
if err != nil {
return nil, fmt.Errorf("cannot find trace_id %q start time: %s", traceID, err)
}

startTime := currentTime.Add(-*traceSearchStep)
endTime := currentTime
for startTime.UnixNano() > 0 { // todo: no need to search time range before retention period.
qq := q.CloneWithTimeFilter(currentTime.UnixNano(), startTime.UnixNano(), endTime.UnixNano())
if err = vtstorage.RunQuery(ctxWithCancel, cp.TenantIDs, qq, writeBlock); err != nil {
return nil, err
}
if missingTimeColumn.Load() {
return nil, fmt.Errorf("missing _time column in the result for the query [%s]", qq)
}

// no hit in this time range, continue with step.
if len(rows) == 0 {
endTime = startTime
startTime = startTime.Add(-*traceSearchStep)
continue
}

// found result, perform extra search for traceMaxDurationWindow and then break.
qq = q.CloneWithTimeFilter(currentTime.UnixNano(), startTime.Add(-*traceMaxDurationWindow).UnixNano(), startTime.UnixNano())
if err = vtstorage.RunQuery(ctxWithCancel, cp.TenantIDs, qq, writeBlock); err != nil {
return nil, err
}
if missingTimeColumn.Load() {
return nil, fmt.Errorf("missing _time column in the result for the query [%s]", qq)
}
break
// fast path: trace start time found, search in [trace start time, trace start time + *traceMaxDurationWindow] time range.
if !traceTimestamp.IsZero() {
return findSpansByTraceIDAndTime(ctx, cp, traceID, traceTimestamp.Add(-*traceMaxDurationWindow), traceTimestamp.Add(*traceMaxDurationWindow))
}

return rows, nil
// slow path: if trace start time not exist, probably the root span was not available.
// try to search from now to 0 timestamp.
return findSpansByTraceID(ctx, cp, traceID)
}

// GetTraceList returns multiple traceIDs and spans of them in []*Row format.
Expand Down Expand Up @@ -420,6 +359,162 @@ func findTraceIDsSplitTimeRange(ctx context.Context, q *logstorage.Query, cp *Co
return checkTraceIDList(traceIDList), maxStartTime, nil
}

// findTraceIDTimeSplitTimeRange try to search from {trace_id_idx_stream="-"} stream, which contains
// the trace_id and start time of the root span. It returns the start time of the trace if found.
// Otherwise, the root span may not reach VictoriaTraces, and zero time is returned.
func findTraceIDTimeSplitTimeRange(ctx context.Context, q *logstorage.Query, cp *CommonParams) (time.Time, error) {
traceIDStartTimeInt := int64(0)
var missingTimeColumn atomic.Bool
ctxWithCancel, cancel := context.WithCancel(ctx)

writeBlock := func(_ uint, db *logstorage.DataBlock) {
if missingTimeColumn.Load() {
return
}

columns := db.Columns
clonedColumnNames := make([]string, len(columns))
for i, c := range columns {
clonedColumnNames[i] = strings.Clone(c.Name)
}

timestamps, ok := db.GetTimestamps(nil)
if !ok {
missingTimeColumn.Store(true)
cancel()
return
}
if len(timestamps) > 1 {
logger.Errorf("found multiple trace_id in index, timestamps: %v", timestamps)
}
traceIDStartTimeInt = timestamps[0]
}

currentTime := time.Now()
startTime := currentTime.Add(-*traceSearchStep)
endTime := currentTime
for startTime.UnixNano() > 0 { // todo: no need to search time range before retention period.
qq := q.CloneWithTimeFilter(currentTime.UnixNano(), startTime.UnixNano(), endTime.UnixNano())
if err := vtstorage.RunQuery(ctxWithCancel, cp.TenantIDs, qq, writeBlock); err != nil {
return time.Time{}, err
}
if missingTimeColumn.Load() {
return time.Time{}, fmt.Errorf("missing _time column in the result for the query [%s]", qq)
}

// no hit in this time range, continue with step.
if traceIDStartTimeInt == 0 {
endTime = startTime
startTime = startTime.Add(-*traceSearchStep)
continue
}

// found result, perform extra search for traceMaxDurationWindow and then break.
return time.Unix(traceIDStartTimeInt/1e9, traceIDStartTimeInt%1e9), nil
}

return time.Time{}, nil
}

// findSpanByTraceID search for spans from now to 0 time with steps.
// It stops when rows are found in a time range.
func findSpansByTraceID(ctx context.Context, cp *CommonParams, traceID string) ([]*Row, error) {
// query: trace_id:traceID
currentTime := time.Now()
startTime := currentTime.Add(-*traceSearchStep)
endTime := currentTime
var (
rows []*Row
err error
)
for startTime.UnixNano() > 0 { // todo: no need to search time range before retention period.
rows, err = findSpansByTraceIDAndTime(ctx, cp, traceID, startTime, endTime)
if err != nil {
return nil, err
}
// no hit in this time range, continue with step.
if len(rows) == 0 {
endTime = startTime
startTime = startTime.Add(-*traceSearchStep)
continue
}

// found result, perform extra search for traceMaxDurationWindow and then break.
extraRows, err := findSpansByTraceIDAndTime(ctx, cp, traceID, startTime.Add(-*traceMaxDurationWindow), startTime)
if err != nil {
return nil, err
}
rows = append(rows, extraRows...)
break
}
return rows, nil
}

// findSpansByTraceIDAndTime search for spans in given time range.
func findSpansByTraceIDAndTime(ctx context.Context, cp *CommonParams, traceID string, startTime, endTime time.Time) ([]*Row, error) {
// query: trace_id:traceID
qStr := fmt.Sprintf(otelpb.TraceIDField+": %q", traceID)
q, err := logstorage.ParseQueryAtTimestamp(qStr, endTime.UnixNano())
if err != nil {
return nil, fmt.Errorf("cannot parse query [%s]: %s", qStr, err)
}

ctxWithCancel, cancel := context.WithCancel(ctx)

// search for trace spans and write to `rows []*Row`
var rowsLock sync.Mutex
var rows []*Row
var missingTimeColumn atomic.Bool
writeBlock := func(_ uint, db *logstorage.DataBlock) {
if missingTimeColumn.Load() {
return
}

columns := db.Columns
clonedColumnNames := make([]string, len(columns))
for i, c := range columns {
clonedColumnNames[i] = strings.Clone(c.Name)
}

timestamps, ok := db.GetTimestamps(nil)
if !ok {
missingTimeColumn.Store(true)
cancel()
return
}

for i, timestamp := range timestamps {
fields := make([]logstorage.Field, 0, len(columns))
for j := range columns {
// column could be empty if this span does not contain such field.
// only append non-empty columns.
if columns[j].Values[i] != "" {
fields = append(fields, logstorage.Field{
Name: clonedColumnNames[j],
Value: strings.Clone(columns[j].Values[i]),
})
}
}

rowsLock.Lock()
rows = append(rows, &Row{
Timestamp: timestamp,
Fields: fields,
})
rowsLock.Unlock()
}
}

qq := q.CloneWithTimeFilter(endTime.UnixNano(), startTime.UnixNano(), endTime.UnixNano())
if err = vtstorage.RunQuery(ctxWithCancel, cp.TenantIDs, qq, writeBlock); err != nil {
return nil, err
}
if missingTimeColumn.Load() {
return nil, fmt.Errorf("missing _time column in the result for the query [%s]", qq)
}
return rows, nil
}

// checkTraceIDList removes invalid `trace_id`. It helps prevent query injection.
func checkTraceIDList(traceIDList []string) []string {
result := make([]string, 0, len(traceIDList))
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/VictoriaMetrics/VictoriaLogs v1.25.1
github.com/VictoriaMetrics/VictoriaMetrics v0.0.0-20250714222639-15242a70a79f
github.com/VictoriaMetrics/easyproto v0.1.4
github.com/VictoriaMetrics/fastcache v1.12.5
github.com/VictoriaMetrics/metrics v1.38.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/google/go-cmp v0.7.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ github.com/VictoriaMetrics/VictoriaMetrics v0.0.0-20250714222639-15242a70a79f h1
github.com/VictoriaMetrics/VictoriaMetrics v0.0.0-20250714222639-15242a70a79f/go.mod h1:sifySYoFINFigscdMlbdMh3LVi7ktP28DAT92f0SsQA=
github.com/VictoriaMetrics/easyproto v0.1.4 h1:r8cNvo8o6sR4QShBXQd1bKw/VVLSQma/V2KhTBPf+Sc=
github.com/VictoriaMetrics/easyproto v0.1.4/go.mod h1:QlGlzaJnDfFd8Lk6Ci/fuLxfTo3/GThPs2KH23mv710=
github.com/VictoriaMetrics/fastcache v1.12.5 h1:966OX9JjqYmDAFdp3wEXLwzukiHIm+GVlZHv6B8KW3k=
github.com/VictoriaMetrics/fastcache v1.12.5/go.mod h1:K+JGPBn0sueFlLjZ8rcVM0cKkWKNElKyQXmw57QOoYI=
github.com/VictoriaMetrics/metrics v1.38.0 h1:1d0dRgVH8Nnu8dKMfisKefPC3q7gqf3/odyO0quAvyA=
github.com/VictoriaMetrics/metrics v1.38.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
github.com/VictoriaMetrics/metricsql v0.84.6 h1:r1rl05prim/r+Me4BUULaZQYXn2eZa3dnrtk+hY3X90=
github.com/VictoriaMetrics/metricsql v0.84.6/go.mod h1:d4EisFO6ONP/HIGDYTAtwrejJBBeKGQYiRl095bS4QQ=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
Expand Down
7 changes: 7 additions & 0 deletions lib/protoparser/opentelemetry/pb/trace_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ package pb

// trace_fields.go contains field names when storing OTLP trace span data in VictoriaLogs.

// Special: TraceID index stream and fields
const (
TraceIDIndexStreamName = "trace_id_idx_stream"
TraceIDIndexFieldName = "trace_id_idx"
TraceIDIndexPartitionCount = uint64(1024)
)

// Resource
const (
ResourceAttrPrefix = "resource_attr:"
Expand Down
22 changes: 22 additions & 0 deletions vendor/github.com/VictoriaMetrics/fastcache/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading