Skip to content

Commit 477f2eb

Browse files
committed
Support parallel parsing protocol data in the access log module
1 parent 0ae8f12 commit 477f2eb

File tree

9 files changed

+45
-24
lines changed

9 files changed

+45
-24
lines changed

CHANGES.md

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Release Notes.
1313
* Fix the unaligned memory accesses for `upload_socket_data_buf`.
1414
* Support for connecting to the backend server over TLS without requiring `ca.pem`.
1515
* Fix missing the first socket detail event in HTTPS protocol.
16+
* Support parallel parsing protocol data in the access log module.
1617

1718
#### Bug Fixes
1819
* Fix the base image cannot run in the arm64.

configs/rover_configs.yaml

+3-1
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,10 @@ access_log:
150150
protocol_analyze:
151151
# The size of socket data buffer on each CPU
152152
per_cpu_buffer: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PER_CPU_BUFFER:400KB}
153+
# The count of parallel protocol event parse
154+
parse_parallels: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PARSE_PARALLELS:2}
153155
# The count of parallel protocol analyzer
154-
parallels: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PARALLELS:2}
156+
analyze_parallels: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PARALLELS:2}
155157
# The size of per paralleled analyzer queue
156158
queue_size: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_QUEUE_SIZE:5000}
157159

pkg/accesslog/collector/connection.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,12 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext
7777
c.eventQueue = btf.NewEventQueue(ctx.Config.ConnectionAnalyze.Parallels, ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext {
7878
return newConnectionPartitionContext(ctx, track)
7979
})
80-
c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize), func() interface{} {
80+
c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize), 1, func() interface{} {
8181
return &events.SocketConnectEvent{}
8282
}, func(data interface{}) string {
8383
return fmt.Sprintf("%d", data.(*events.SocketConnectEvent).ConID)
8484
})
85-
c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, int(perCPUBufferSize), func() interface{} {
85+
c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, int(perCPUBufferSize), 1, func() interface{} {
8686
return &events.SocketCloseEvent{}
8787
}, func(data interface{}) string {
8888
return fmt.Sprintf("%d", data.(*events.SocketCloseEvent).ConnectionID)

pkg/accesslog/collector/protocols/queue.go

+18-13
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,11 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext) (*AnalyzeQueue, error) {
6262
if int(perCPUBufferSize) < os.Getpagesize() {
6363
return nil, fmt.Errorf("the cpu buffer must bigger than %dB", os.Getpagesize())
6464
}
65-
if ctx.Config.ProtocolAnalyze.Parallels < 1 {
66-
return nil, fmt.Errorf("the parallels cannot be small than 1")
65+
if ctx.Config.ProtocolAnalyze.AnalyzeParallels < 1 {
66+
return nil, fmt.Errorf("the analyze parallels cannot be small than 1")
67+
}
68+
if ctx.Config.ProtocolAnalyze.ParseParallels < 1 {
69+
return nil, fmt.Errorf("the parse parallels cannot be small than 1")
6770
}
6871
if ctx.Config.ProtocolAnalyze.QueueSize < 1 {
6972
return nil, fmt.Errorf("the queue size be small than 1")
@@ -85,20 +88,22 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext) (*AnalyzeQueue, error) {
8588
}
8689

8790
func (q *AnalyzeQueue) Start(ctx context.Context) {
88-
q.eventQueue = btf.NewEventQueue(q.context.Config.ProtocolAnalyze.Parallels, q.context.Config.ProtocolAnalyze.QueueSize,
91+
q.eventQueue = btf.NewEventQueue(q.context.Config.ProtocolAnalyze.AnalyzeParallels, q.context.Config.ProtocolAnalyze.QueueSize,
8992
func(num int) btf.PartitionContext {
9093
return NewPartitionContext(q.context, num, q.supportAnalyzers(q.context))
9194
})
92-
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue, int(q.perCPUBuffer), func() interface{} {
93-
return q.detailSupplier()
94-
}, func(data interface{}) string {
95-
return fmt.Sprintf("%d", data.(events.SocketDetail).GetConnectionID())
96-
})
97-
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadEventQueue, int(q.perCPUBuffer), func() interface{} {
98-
return &events.SocketDataUploadEvent{}
99-
}, func(data interface{}) string {
100-
return fmt.Sprintf("%d", data.(*events.SocketDataUploadEvent).ConnectionID)
101-
})
95+
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue, int(q.perCPUBuffer),
96+
q.context.Config.ProtocolAnalyze.ParseParallels, func() interface{} {
97+
return q.detailSupplier()
98+
}, func(data interface{}) string {
99+
return fmt.Sprintf("%d", data.(events.SocketDetail).GetConnectionID())
100+
})
101+
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadEventQueue, int(q.perCPUBuffer),
102+
q.context.Config.ProtocolAnalyze.ParseParallels, func() interface{} {
103+
return &events.SocketDataUploadEvent{}
104+
}, func(data interface{}) string {
105+
return fmt.Sprintf("%d", data.(*events.SocketDataUploadEvent).ConnectionID)
106+
})
102107

103108
q.eventQueue.Start(ctx, q.context.BPF.Linker)
104109
}

pkg/accesslog/common/config.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ type ConnectionAnalyzeConfig struct {
4343

4444
type ProtocolAnalyzeConfig struct {
4545
PerCPUBufferSize string `mapstructure:"per_cpu_buffer"`
46-
Parallels int `mapstructure:"parallels"`
46+
ParseParallels int `mapstructure:"parse_parallels"`
47+
AnalyzeParallels int `mapstructure:"analyze_parallels"`
4748
QueueSize int `mapstructure:"queue_size"`
4849
}
4950

pkg/profiling/continuous/checker/bpf/network/network.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func startBPFIfNeed() error {
147147
n.ReceiveBufferEvent(event)
148148
}
149149
})
150-
bpfLinker.ReadEventAsyncWithBufferSize(bpf.SocketBufferSendQueue, reader.Read, os.Getpagesize()*100, reader.BufferDataBPFSupplier)
150+
bpfLinker.ReadEventAsyncWithBufferSize(bpf.SocketBufferSendQueue, reader.Read, os.Getpagesize()*100, 1, reader.BufferDataBPFSupplier)
151151

152152
if err := bpfLinker.HasError(); err != nil {
153153
_ = bpfLinker.Close()

pkg/profiling/task/network/analyze/layer7/events.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,14 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize int, config *profili
3636

3737
func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader) {
3838
// socket buffer data
39-
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadEventQueue, l.protocolPerCPUBuffer, func() interface{} {
39+
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadEventQueue, l.protocolPerCPUBuffer, 1, func() interface{} {
4040
return &analyzeBase.SocketDataUploadEvent{}
4141
}, func(data interface{}) string {
4242
return data.(*analyzeBase.SocketDataUploadEvent).GenerateConnectionID()
4343
})
4444

4545
// socket detail
46-
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue, l.protocolPerCPUBuffer, func() interface{} {
46+
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue, l.protocolPerCPUBuffer, 1, func() interface{} {
4747
return &analyzeBase.SocketDetailEvent{}
4848
}, func(data interface{}) string {
4949
return data.(*analyzeBase.SocketDetailEvent).GenerateConnectionID()

pkg/tools/btf/linker.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -156,17 +156,27 @@ func (m *Linker) AddTracePoint(sys, name string, p *ebpf.Program) {
156156
}
157157

158158
func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader RingBufferReader, dataSupplier func() interface{}) {
159-
m.ReadEventAsyncWithBufferSize(emap, bufReader, os.Getpagesize(), dataSupplier)
159+
m.ReadEventAsyncWithBufferSize(emap, bufReader, os.Getpagesize(), 1, dataSupplier)
160160
}
161161

162-
func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader RingBufferReader, perCPUBuffer int, dataSupplier func() interface{}) {
162+
func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader RingBufferReader, perCPUBuffer, parallels int, dataSupplier func() interface{}) {
163163
rd, err := perf.NewReader(emap, perCPUBuffer)
164164
if err != nil {
165165
m.errors = multierror.Append(m.errors, fmt.Errorf("open ring buffer error: %v", err))
166166
return
167167
}
168+
if parallels < 1 {
169+
m.errors = multierror.Append(m.errors, fmt.Errorf("parallels rading count must bigger than 1"))
170+
return
171+
}
168172
m.closers = append(m.closers, rd)
169173

174+
for i := 0; i < parallels; i++ {
175+
m.asyncReadEvent(rd, emap, dataSupplier, bufReader)
176+
}
177+
}
178+
179+
func (m *Linker) asyncReadEvent(rd *perf.Reader, emap *ebpf.Map, dataSupplier func() interface{}, bufReader RingBufferReader) {
170180
go func() {
171181
for {
172182
record, err := rd.Read()

pkg/tools/btf/queue.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type mapReceiver struct {
4343
perCPUBuffer int
4444
dataSupplier func() interface{}
4545
router func(data interface{}) string
46+
parallels int
4647
}
4748

4849
func NewEventQueue(partitionCount, sizePerPartition int, contextGenerator func(partitionNum int) PartitionContext) *EventQueue {
@@ -53,13 +54,14 @@ func NewEventQueue(partitionCount, sizePerPartition int, contextGenerator func(p
5354
return &EventQueue{count: partitionCount, partitions: partitions}
5455
}
5556

56-
func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize int, dataSupplier func() interface{},
57+
func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize, parallels int, dataSupplier func() interface{},
5758
routeGenerator func(data interface{}) string) {
5859
e.receivers = append(e.receivers, &mapReceiver{
5960
emap: emap,
6061
perCPUBuffer: perCPUBufferSize,
6162
dataSupplier: dataSupplier,
6263
router: routeGenerator,
64+
parallels: parallels,
6365
})
6466
}
6567

@@ -92,7 +94,7 @@ func (e *EventQueue) start0(ctx context.Context, linker *Linker) {
9294
func(receiver *mapReceiver) {
9395
linker.ReadEventAsyncWithBufferSize(receiver.emap, func(data interface{}) {
9496
e.routerTransformer(data, receiver.router)
95-
}, receiver.perCPUBuffer, receiver.dataSupplier)
97+
}, receiver.perCPUBuffer, r.parallels, receiver.dataSupplier)
9698
}(r)
9799
}
98100

0 commit comments

Comments
 (0)