Skip to content

Commit

Permalink
Improve the performance of access log module (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Dec 23, 2024
1 parent 023d6c2 commit 78fb9c4
Show file tree
Hide file tree
Showing 31 changed files with 772 additions and 434 deletions.
5 changes: 4 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ Release Notes.
* Support parallel parsing protocol data in the access log module.
* Upgrade Go library to `1.22`, eBPF library to `0.16.0`.
* Reduce missing details issue in the access log module.
* Introduce ringbuf queue to improve performance in the access log module.
* Improve HTTP/1.x protocol parsing strategy to encase missing data.
* Add gRPC sender to sending the access log to the backend.
* Add warning log when the event queue almost full in the access log module.
* Reduce unessential `conntrack` query when detect new connection.

#### Bug Fixes
* Fix the base image cannot run in the arm64.
Expand Down
6 changes: 4 additions & 2 deletions bpf/accesslog/common/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "data_args.h"
#include "socket_opts.h"
#include "queue.h"
#include "socket_data.h"

// syscall:connect
struct connect_args_t {
Expand Down Expand Up @@ -107,7 +108,7 @@ struct socket_connect_event_t {
__u64 conntrack_upstream_iph;
__u32 conntrack_upstream_port;
};
DATA_QUEUE(socket_connection_event_queue, 1024 * 1024);
DATA_QUEUE(socket_connection_event_queue);

// active connection cached into the hashmap
// if connection closed, then deleted
Expand Down Expand Up @@ -159,7 +160,7 @@ struct socket_close_event_t {
// close success
__u32 success;
};
DATA_QUEUE(socket_close_event_queue, 1024 * 1024);
DATA_QUEUE(socket_close_event_queue);

static __inline bool family_should_trace(const __u32 family) {
return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ? false : true;
Expand Down Expand Up @@ -303,6 +304,7 @@ static __inline void submit_connection_when_not_exists(void *ctx, __u64 id, stru
}

static __inline void notify_close_connection(void* ctx, __u64 conid, struct active_connection_t* con, __u64 start_time, __u64 end_time, int ret) {
bpf_map_delete_elem(&socket_data_last_id_map, &conid);
struct socket_close_event_t *close_event;
close_event = rover_reserve_buf(&socket_close_event_queue, sizeof(*close_event));
if (close_event == NULL) {
Expand Down
2 changes: 1 addition & 1 deletion bpf/accesslog/syscalls/transfer.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ struct socket_detail_t {
__u8 ssl;
};

DATA_QUEUE(socket_detail_queue, 1024 * 1024);
DATA_QUEUE(socket_detail_queue);

static __always_inline void process_write_data(void *ctx, __u64 id, struct sock_data_args_t *args, ssize_t bytes_count,
__u32 data_direction, const bool vecs, __u8 func_name, bool ssl) {
Expand Down
26 changes: 5 additions & 21 deletions bpf/include/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@

#include "api.h"

#define DATA_QUEUE(name, size) \
struct { \
__uint(type, BPF_MAP_TYPE_RINGBUF); \
__uint(max_entries, size); \
} name SEC(".maps"); \
const void *rover_data_queue_##name __attribute__((unused));
#define DATA_QUEUE(name) \
struct { \
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);\
} name SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
Expand All @@ -36,26 +34,12 @@ struct {
static __always_inline void *rover_reserve_buf(void *map, __u64 size) {
static const int zero = 0;

if (bpf_core_enum_value_exists(enum bpf_func_id,
BPF_FUNC_ringbuf_reserve))
return bpf_ringbuf_reserve(map, size, 0);

return bpf_map_lookup_elem(&rover_data_heap, &zero);
}

static __always_inline void rover_discard_buf(void *buf)
{
if (bpf_core_enum_value_exists(enum bpf_func_id,
BPF_FUNC_ringbuf_discard))
bpf_ringbuf_discard(buf, 0);
static __always_inline void rover_discard_buf(void *buf) {
}

static __always_inline long rover_submit_buf(void *ctx, void *map, void *buf, __u64 size) {
if (bpf_core_enum_value_exists(enum bpf_func_id,
BPF_FUNC_ringbuf_submit)) {
bpf_ringbuf_submit(buf, 0);
return 0;
}

return bpf_perf_event_output(ctx, map, BPF_F_CURRENT_CPU, buf, size);
}
29 changes: 28 additions & 1 deletion bpf/include/socket_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct socket_data_upload_event {
__u64 conid;
__u64 randomid;
__u64 data_id;
__u64 prev_data_id;
__u64 total_size;
char buffer[MAX_TRANSMIT_SOCKET_READ_LENGTH];
};
Expand All @@ -44,7 +45,7 @@ struct {
__type(value, struct socket_data_upload_event);
__uint(max_entries, 1);
} socket_data_upload_event_per_cpu_map SEC(".maps");
DATA_QUEUE(socket_data_upload_queue, 1024 * 1024);
DATA_QUEUE(socket_data_upload_queue);

struct socket_data_sequence_t {
__u64 data_id;
Expand Down Expand Up @@ -81,6 +82,7 @@ struct upload_data_args {
__u64 random_id;

__u64 socket_data_id;
__u64 prev_socket_data_id;
struct iovec *socket_data_iovec;
size_t socket_data_iovlen;
ssize_t bytes_count;
Expand Down Expand Up @@ -129,11 +131,13 @@ static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 ind
socket_data_event->randomid = args->random_id;
socket_data_event->total_size = args->bytes_count;
socket_data_event->data_id = args->socket_data_id;
socket_data_event->prev_data_id = args->prev_socket_data_id;

socket_data_event->sequence = index;
socket_data_event->data_len = size;
socket_data_event->finished = is_finished;
socket_data_event->have_reduce_after_chunk = have_reduce_after_chunk;
asm volatile("%[size] &= 0x7ff;\n" ::[size] "+r"(size) :);
bpf_probe_read(&socket_data_event->buffer, size, buf);
rover_submit_buf(ctx, &socket_data_upload_queue, socket_data_event, sizeof(*socket_data_event));
}
Expand Down Expand Up @@ -208,15 +212,38 @@ static __always_inline void upload_socket_data_iov(void *ctx, struct iovec* iov,
UPLOAD_PER_SOCKET_DATA_IOV();
}

struct socket_data_last_id_t {
__u64 random_id;
__u64 socket_data_id;
};
struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__uint(max_entries, 10000);
__type(key, __u64);
__type(value, struct socket_data_last_id_t);
} socket_data_last_id_map SEC(".maps");

static __inline void upload_socket_data(void *ctx, struct upload_data_args *args) {
// must have protocol and ssl must same(plain)
// if the connection data is needs to skip upload, then skip
if (args->connection_protocol == CONNECTION_PROTOCOL_UNKNOWN || args->connection_ssl != args->socket_data_ssl || args->connection_skip_data_upload == 1) {
return;
}
struct socket_data_last_id_t *latest = bpf_map_lookup_elem(&socket_data_last_id_map, &args->con_id);
args->prev_socket_data_id = 0;
if (latest != NULL && latest->random_id == args->random_id) {
args->prev_socket_data_id = latest->socket_data_id;
}
if (args->socket_data_buf != NULL) {
upload_socket_data_buf(ctx, args->socket_data_buf, args->bytes_count, args, args->socket_ssl_buffer_force_unfinished);
} else if (args->socket_data_iovec != NULL) {
upload_socket_data_iov(ctx, args->socket_data_iovec, args->socket_data_iovlen, args->bytes_count, args);
}

if (latest == NULL || latest->socket_data_id != args->socket_data_id) {
struct socket_data_last_id_t data = {};
data.random_id = args->random_id;
data.socket_data_id = args->socket_data_id;
bpf_map_update_elem(&socket_data_last_id_map, &args->con_id, &data, BPF_ANY);
}
}
2 changes: 1 addition & 1 deletion pkg/accesslog/collector/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext
if err != nil {
connectionLogger.Warnf("cannot create the connection tracker, %v", err)
}
c.eventQueue = btf.NewEventQueue(ctx.Config.ConnectionAnalyze.AnalyzeParallels,
c.eventQueue = btf.NewEventQueue("connection resolver", ctx.Config.ConnectionAnalyze.AnalyzeParallels,
ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext {
return newConnectionPartitionContext(ctx, track)
})
Expand Down
29 changes: 21 additions & 8 deletions pkg/accesslog/collector/protocols/http1.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe
}

messageType, err := p.reader.IdentityMessageType(buf)
log.Debugf("ready to reading message type, messageType: %v, buf: %p, data id: %d, "+
"connection ID: %d, random ID: %d, error: %v", messageType, buf, buf.Position().DataID(),
metrics.ConnectionID, metrics.RandomID, err)
if err != nil {
http1Log.Debugf("failed to identity message type, %v", err)
if buf.SkipCurrentElement() {
Expand All @@ -115,6 +112,9 @@ func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelpe
metrics.ConnectionID, metrics.RandomID, buf.Position().DataID(), err)
}

http1Log.Debugf("readed message, messageType: %v, buf: %p, data id: %d, "+
"connection ID: %d, random ID: %d, metrics : %p, handle result: %d",
messageType, buf, buf.Position().DataID(), metrics.ConnectionID, metrics.RandomID, metrics, result)
finishReading := false
switch result {
case enums.ParseResultSuccess:
Expand Down Expand Up @@ -149,13 +149,13 @@ func (p *HTTP1Protocol) handleRequest(metrics *HTTP1Metrics, buf *buffer.Buffer)
}

func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, b *buffer.Buffer) (enums.ParseResult, error) {
firstRequest := metrics.halfRequests.Front()
if firstRequest == nil {
log.Debugf("cannot found request for response, skip response, connection ID: %d, random ID: %d",
metrics.ConnectionID, metrics.RandomID)
request := metrics.findMatchesRequest(b.Position().DataID(), b.Position().PrevDataID())
if request == nil {
log.Debugf("cannot found request for response, skip response, connection ID: %d, random ID: %d, "+
"required prev data id: %d, current data id: %d",
metrics.ConnectionID, metrics.RandomID, b.Position().PrevDataID(), b.Position().DataID())
return enums.ParseResultSkipPackage, nil
}
request := metrics.halfRequests.Remove(firstRequest).(*reader.Request)

// parsing response
response, result, err := p.reader.ReadResponse(request, b, true)
Expand Down Expand Up @@ -286,3 +286,16 @@ func (m *HTTP1Metrics) appendRequestToList(req *reader.Request) {
m.halfRequests.PushBack(req)
}
}

func (m *HTTP1Metrics) findMatchesRequest(currentDataID, prevDataID uint64) *reader.Request {
for element := m.halfRequests.Front(); element != nil; element = element.Next() {
req := element.Value.(*reader.Request)
// if the tail data id of request is equals to the prev data id of response
// or tail request data id+1==first response data id, then return the request
if uint64(req.MaxDataID()) == prevDataID || uint64(req.MaxDataID()+1) == currentDataID {
m.halfRequests.Remove(element)
return req
}
}
return nil
}
22 changes: 12 additions & 10 deletions pkg/accesslog/collector/protocols/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext) (*AnalyzeQueue, error) {
}

func (q *AnalyzeQueue) Start(ctx context.Context) {
q.eventQueue = btf.NewEventQueue(q.context.Config.ProtocolAnalyze.AnalyzeParallels, q.context.Config.ProtocolAnalyze.QueueSize,
q.eventQueue = btf.NewEventQueue("socket data analyzer",
q.context.Config.ProtocolAnalyze.AnalyzeParallels, q.context.Config.ProtocolAnalyze.QueueSize,
func(num int) btf.PartitionContext {
return NewPartitionContext(q.context, num, q.supportAnalyzers(q.context))
})
Expand Down Expand Up @@ -128,12 +129,13 @@ type PartitionContext struct {
func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID uint64,
protocol enums.ConnectionProtocol, currentDataID uint64) *PartitionConnection {
connection := &PartitionConnection{
connectionID: conID,
randomID: randomID,
dataBuffers: make(map[enums.ConnectionProtocol]*buffer.Buffer),
protocol: make(map[enums.ConnectionProtocol]uint64),
protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol),
protocolMetrics: make(map[enums.ConnectionProtocol]ProtocolMetrics),
connectionID: conID,
randomID: randomID,
dataBuffers: make(map[enums.ConnectionProtocol]*buffer.Buffer),
protocol: make(map[enums.ConnectionProtocol]uint64),
protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol),
protocolMetrics: make(map[enums.ConnectionProtocol]ProtocolMetrics),
lastCheckCloseTime: time.Now(),
}
connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol, currentDataID)
return connection
Expand Down Expand Up @@ -173,7 +175,6 @@ func (p *PartitionContext) OnConnectionClose(event *events.SocketCloseEvent, clo
}
connection := conn.(*PartitionConnection)
connection.closeCallback = closeCallback
connection.closed = true
log.Debugf("receive the connection close event and mark is closable, connection ID: %d, random ID: %d, partition number: %d",
event.GetConnectionID(), event.GetRandomID(), p.partitionNum)
}
Expand Down Expand Up @@ -227,8 +228,9 @@ func (p *PartitionContext) Consume(data interface{}) {
connection.AppendDetail(p.context, event)
case *events.SocketDataUploadEvent:
pid, _ := events.ParseConnectionID(event.ConnectionID)
log.Debugf("receive the socket data event, connection ID: %d, random ID: %d, pid: %d, data id: %d, sequence: %d, protocol: %d",
event.ConnectionID, event.RandomID, pid, event.DataID0, event.Sequence0, event.Protocol0)
log.Debugf("receive the socket data event, connection ID: %d, random ID: %d, pid: %d, prev data id: %d, "+
"data id: %d, sequence: %d, protocol: %d",
event.ConnectionID, event.RandomID, pid, event.PrevDataID0, event.DataID0, event.Sequence0, event.Protocol0)
connection := p.getConnectionContext(event.ConnectionID, event.RandomID, event.Protocol0, event.DataID0)
connection.AppendData(event)
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/accesslog/collector/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package collector

import (
"sync"

"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/module"
Expand All @@ -33,6 +35,7 @@ type TLSCollector struct {
context *common.AccessLogContext
monitoredProcesses map[int32]bool
linker *btf.Linker
mutex sync.Mutex
}

func NewTLSCollector() *TLSCollector {
Expand All @@ -57,6 +60,14 @@ func (c *TLSCollector) Stop() {
}

func (c *TLSCollector) OnNewProcessMonitoring(pid int32) {
go func() {
c.addProcess(pid)
}()
}

func (c *TLSCollector) addProcess(pid int32) {
c.mutex.Lock()
defer c.mutex.Unlock()
if _, ok := c.monitoredProcesses[pid]; ok {
return
}
Expand All @@ -83,5 +94,7 @@ func (c *TLSCollector) OnNewProcessMonitoring(pid int32) {
}

func (c *TLSCollector) OnProcessRemoved(pid int32) {
c.mutex.Lock()
defer c.mutex.Unlock()
delete(c.monitoredProcesses, pid)
}
Loading

0 comments on commit 78fb9c4

Please sign in to comment.