diff --git a/CHANGES.md b/CHANGES.md index 4eab9471..3392d50b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -13,6 +13,7 @@ Release Notes. * Fix the unaligned memory accesses for `upload_socket_data_buf`. * Support for connecting to the backend server over TLS without requiring `ca.pem`. * Fix missing the first socket detail event in HTTPS protocol. +* Support parallel parsing protocol data in the access log module. #### Bug Fixes * Fix the base image cannot run in the arm64. diff --git a/configs/rover_configs.yaml b/configs/rover_configs.yaml index ce39111c..8f7ba6e6 100644 --- a/configs/rover_configs.yaml +++ b/configs/rover_configs.yaml @@ -150,8 +150,10 @@ access_log: protocol_analyze: # The size of socket data buffer on each CPU per_cpu_buffer: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PER_CPU_BUFFER:400KB} + # The count of parallel protocol event parse + parse_parallels: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PARSE_PARALLELS:2} # The count of parallel protocol analyzer - parallels: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PARALLELS:2} + analyze_parallels: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PARALLELS:2} # The size of per paralleled analyzer queue queue_size: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_QUEUE_SIZE:5000} diff --git a/pkg/accesslog/collector/connection.go b/pkg/accesslog/collector/connection.go index c1253975..8e8aba1f 100644 --- a/pkg/accesslog/collector/connection.go +++ b/pkg/accesslog/collector/connection.go @@ -77,12 +77,12 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext c.eventQueue = btf.NewEventQueue(ctx.Config.ConnectionAnalyze.Parallels, ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext { return newConnectionPartitionContext(ctx, track) }) - c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize), func() interface{} { + c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize), 1, func() interface{} { return &events.SocketConnectEvent{} }, func(data interface{}) string { return fmt.Sprintf("%d", data.(*events.SocketConnectEvent).ConID) }) - c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, int(perCPUBufferSize), func() interface{} { + c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, int(perCPUBufferSize), 1, func() interface{} { return &events.SocketCloseEvent{} }, func(data interface{}) string { return fmt.Sprintf("%d", data.(*events.SocketCloseEvent).ConnectionID) diff --git a/pkg/accesslog/collector/protocols/queue.go b/pkg/accesslog/collector/protocols/queue.go index ad665841..68708fc1 100644 --- a/pkg/accesslog/collector/protocols/queue.go +++ b/pkg/accesslog/collector/protocols/queue.go @@ -62,8 +62,11 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext) (*AnalyzeQueue, error) { if int(perCPUBufferSize) < os.Getpagesize() { return nil, fmt.Errorf("the cpu buffer must bigger than %dB", os.Getpagesize()) } - if ctx.Config.ProtocolAnalyze.Parallels < 1 { - return nil, fmt.Errorf("the parallels cannot be small than 1") + if ctx.Config.ProtocolAnalyze.AnalyzeParallels < 1 { + return nil, fmt.Errorf("the analyze parallels cannot be small than 1") + } + if ctx.Config.ProtocolAnalyze.ParseParallels < 1 { + return nil, fmt.Errorf("the parse parallels cannot be small than 1") } if ctx.Config.ProtocolAnalyze.QueueSize < 1 { return nil, fmt.Errorf("the queue size be small than 1") @@ -85,20 +88,22 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext) (*AnalyzeQueue, error) { } func (q *AnalyzeQueue) Start(ctx context.Context) { - q.eventQueue = btf.NewEventQueue(q.context.Config.ProtocolAnalyze.Parallels, q.context.Config.ProtocolAnalyze.QueueSize, + q.eventQueue = btf.NewEventQueue(q.context.Config.ProtocolAnalyze.AnalyzeParallels, q.context.Config.ProtocolAnalyze.QueueSize, func(num int) btf.PartitionContext { return NewPartitionContext(q.context, num, q.supportAnalyzers(q.context)) }) - q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue, int(q.perCPUBuffer), func() interface{} { - return q.detailSupplier() - }, func(data interface{}) string { - return fmt.Sprintf("%d", data.(events.SocketDetail).GetConnectionID()) - }) - q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadEventQueue, int(q.perCPUBuffer), func() interface{} { - return &events.SocketDataUploadEvent{} - }, func(data interface{}) string { - return fmt.Sprintf("%d", data.(*events.SocketDataUploadEvent).ConnectionID) - }) + q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue, int(q.perCPUBuffer), + q.context.Config.ProtocolAnalyze.ParseParallels, func() interface{} { + return q.detailSupplier() + }, func(data interface{}) string { + return fmt.Sprintf("%d", data.(events.SocketDetail).GetConnectionID()) + }) + q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadEventQueue, int(q.perCPUBuffer), + q.context.Config.ProtocolAnalyze.ParseParallels, func() interface{} { + return &events.SocketDataUploadEvent{} + }, func(data interface{}) string { + return fmt.Sprintf("%d", data.(*events.SocketDataUploadEvent).ConnectionID) + }) q.eventQueue.Start(ctx, q.context.BPF.Linker) } diff --git a/pkg/accesslog/common/config.go b/pkg/accesslog/common/config.go index f1aab943..1ef0aeb8 100644 --- a/pkg/accesslog/common/config.go +++ b/pkg/accesslog/common/config.go @@ -43,7 +43,8 @@ type ConnectionAnalyzeConfig struct { type ProtocolAnalyzeConfig struct { PerCPUBufferSize string `mapstructure:"per_cpu_buffer"` - Parallels int `mapstructure:"parallels"` + ParseParallels int `mapstructure:"parse_parallels"` + AnalyzeParallels int `mapstructure:"analyze_parallels"` QueueSize int `mapstructure:"queue_size"` } diff --git a/pkg/profiling/continuous/checker/bpf/network/network.go b/pkg/profiling/continuous/checker/bpf/network/network.go index be6300c9..ba03f211 100644 --- a/pkg/profiling/continuous/checker/bpf/network/network.go +++ b/pkg/profiling/continuous/checker/bpf/network/network.go @@ -147,7 +147,7 @@ func startBPFIfNeed() error { n.ReceiveBufferEvent(event) } }) - bpfLinker.ReadEventAsyncWithBufferSize(bpf.SocketBufferSendQueue, reader.Read, os.Getpagesize()*100, reader.BufferDataBPFSupplier) + bpfLinker.ReadEventAsyncWithBufferSize(bpf.SocketBufferSendQueue, reader.Read, os.Getpagesize()*100, 1, reader.BufferDataBPFSupplier) if err := bpfLinker.HasError(); err != nil { _ = bpfLinker.Close() diff --git a/pkg/profiling/task/network/analyze/layer7/events.go b/pkg/profiling/task/network/analyze/layer7/events.go index 2a54579f..dab74d19 100644 --- a/pkg/profiling/task/network/analyze/layer7/events.go +++ b/pkg/profiling/task/network/analyze/layer7/events.go @@ -36,14 +36,14 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize int, config *profili func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader) { // socket buffer data - l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadEventQueue, l.protocolPerCPUBuffer, func() interface{} { + l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadEventQueue, l.protocolPerCPUBuffer, 1, func() interface{} { return &analyzeBase.SocketDataUploadEvent{} }, func(data interface{}) string { return data.(*analyzeBase.SocketDataUploadEvent).GenerateConnectionID() }) // socket detail - l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue, l.protocolPerCPUBuffer, func() interface{} { + l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue, l.protocolPerCPUBuffer, 1, func() interface{} { return &analyzeBase.SocketDetailEvent{} }, func(data interface{}) string { return data.(*analyzeBase.SocketDetailEvent).GenerateConnectionID() diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go index 148222ab..f663737a 100644 --- a/pkg/tools/btf/linker.go +++ b/pkg/tools/btf/linker.go @@ -156,17 +156,28 @@ func (m *Linker) AddTracePoint(sys, name string, p *ebpf.Program) { } func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader RingBufferReader, dataSupplier func() interface{}) { - m.ReadEventAsyncWithBufferSize(emap, bufReader, os.Getpagesize(), dataSupplier) + m.ReadEventAsyncWithBufferSize(emap, bufReader, os.Getpagesize(), 1, dataSupplier) } -func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader RingBufferReader, perCPUBuffer int, dataSupplier func() interface{}) { +func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader RingBufferReader, perCPUBuffer, + parallels int, dataSupplier func() interface{}) { rd, err := perf.NewReader(emap, perCPUBuffer) if err != nil { m.errors = multierror.Append(m.errors, fmt.Errorf("open ring buffer error: %v", err)) return } + if parallels < 1 { + m.errors = multierror.Append(m.errors, fmt.Errorf("parallels rading count must bigger than 1")) + return + } m.closers = append(m.closers, rd) + for i := 0; i < parallels; i++ { + m.asyncReadEvent(rd, emap, dataSupplier, bufReader) + } +} + +func (m *Linker) asyncReadEvent(rd *perf.Reader, emap *ebpf.Map, dataSupplier func() interface{}, bufReader RingBufferReader) { go func() { for { record, err := rd.Read() diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go index dc0b7cb3..813979d5 100644 --- a/pkg/tools/btf/queue.go +++ b/pkg/tools/btf/queue.go @@ -43,6 +43,7 @@ type mapReceiver struct { perCPUBuffer int dataSupplier func() interface{} router func(data interface{}) string + parallels int } func NewEventQueue(partitionCount, sizePerPartition int, contextGenerator func(partitionNum int) PartitionContext) *EventQueue { @@ -53,13 +54,14 @@ func NewEventQueue(partitionCount, sizePerPartition int, contextGenerator func(p return &EventQueue{count: partitionCount, partitions: partitions} } -func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize int, dataSupplier func() interface{}, +func (e *EventQueue) RegisterReceiver(emap *ebpf.Map, perCPUBufferSize, parallels int, dataSupplier func() interface{}, routeGenerator func(data interface{}) string) { e.receivers = append(e.receivers, &mapReceiver{ emap: emap, perCPUBuffer: perCPUBufferSize, dataSupplier: dataSupplier, router: routeGenerator, + parallels: parallels, }) } @@ -92,7 +94,7 @@ func (e *EventQueue) start0(ctx context.Context, linker *Linker) { func(receiver *mapReceiver) { linker.ReadEventAsyncWithBufferSize(receiver.emap, func(data interface{}) { e.routerTransformer(data, receiver.router) - }, receiver.perCPUBuffer, receiver.dataSupplier) + }, receiver.perCPUBuffer, r.parallels, receiver.dataSupplier) }(r) }