Skip to content

Commit

Permalink
Reduce handle connect event time in the access log module (#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Dec 30, 2024
1 parent b5dadaf commit 935aece
Show file tree
Hide file tree
Showing 31 changed files with 399 additions and 433 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Release Notes.
* Add warning log when the event queue almost full in the access log module.
* Reduce unessential `conntrack` query when detect new connection.
* Reduce CPU and memory usage in the access log module.
* Reduce handle connection event time in the access log module.

#### Bug Fixes
* Fix the base image cannot run in the arm64.
Expand Down
3 changes: 2 additions & 1 deletion bpf/accesslog/common/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
// syscall:connect
struct connect_args_t {
__u32 fd;
__u32 fix;
__u32 has_remote;
struct sockaddr* addr;
struct sock *sock;
__u64 start_nacs;
Expand Down Expand Up @@ -305,6 +305,7 @@ static __inline void submit_connection_when_not_exists(void *ctx, __u64 id, stru

static __inline void notify_close_connection(void* ctx, __u64 conid, struct active_connection_t* con, __u64 start_time, __u64 end_time, int ret) {
bpf_map_delete_elem(&socket_data_last_id_map, &conid);
bpf_map_delete_elem(&socket_data_id_generate_map, &conid);
struct socket_close_event_t *close_event;
close_event = rover_reserve_buf(&socket_close_event_queue, sizeof(*close_event));
if (close_event == NULL) {
Expand Down
6 changes: 3 additions & 3 deletions bpf/accesslog/common/data_args.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ struct sock_data_args_t {
};
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(max_entries, 10000);
__uint(max_entries, 65535);
__type(key, __u64);
__type(value, struct sock_data_args_t);
} socket_data_args SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(max_entries, 10000);
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__uint(max_entries, 65535);
__type(key, __u64);
__type(value, __u64);
} socket_data_id_generate_map SEC(".maps");
Expand Down
5 changes: 3 additions & 2 deletions bpf/accesslog/syscalls/connect_conntrack.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ static __always_inline int nf_conn_aware(struct pt_regs* ctx, struct nf_conn *ct
}

// already contains the remote address
if (&(connect_args->remote) != NULL) {
if (connect_args->has_remote && &(connect_args->remote) != NULL) {
return 0;
}

Expand Down Expand Up @@ -126,6 +126,7 @@ static __always_inline int nf_conn_aware(struct pt_regs* ctx, struct nf_conn *ct
remote.ipl = reply_conn.saddr_l;
remote.port = reply_conn.sport;
connect_args->remote = remote;
connect_args->has_remote = 1;
bpf_map_update_elem(&conecting_args, &id, connect_args, 0);

return 0;
Expand All @@ -144,4 +145,4 @@ int nf_confirm(struct pt_regs* ctx) {
SEC("kprobe/ctnetlink_fill_info")
int nf_ctnetlink_fill_info(struct pt_regs* ctx) {
return nf_conn_aware(ctx, (struct nf_conn*)PT_REGS_PARM5(ctx));
}
}
2 changes: 1 addition & 1 deletion bpf/accesslog/syscalls/connect_conntrack.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,4 @@ struct nf_conn {
struct nf_conntrack_tuple_hash tuplehash[IP_CT_DIR_MAX];
long unsigned int status;
__u32 mark;
} __attribute__((preserve_access_index));
} __attribute__((preserve_access_index));
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.3.0
github.com/spf13/viper v1.10.1
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.4
github.com/zekroTJA/timedmap v1.4.0
golang.org/x/arch v0.0.0-20220722155209-00200b7164a7
golang.org/x/net v0.32.0
Expand Down Expand Up @@ -46,7 +46,7 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mdlayher/netlink v1.7.2 // indirect
github.com/mdlayher/socket v0.4.1 // indirect
github.com/mdlayher/socket v0.5.1 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand Down
11 changes: 4 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU
github.com/mdlayher/socket v0.0.0-20210307095302-262dc9984e00/go.mod h1:GAFlyu4/XV68LkQKYzKhIo/WW7j3Zi0YRAz/BOoanUc=
github.com/mdlayher/socket v0.0.0-20211007213009-516dcbdf0267/go.mod h1:nFZ1EtZYK8Gi/k6QNu7z7CgO20i/4ExeQswwWuPmG/g=
github.com/mdlayher/socket v0.1.0/go.mod h1:mYV5YIZAfHh4dzDVzI8x8tWLWCliuX8Mon5Awbj+qDs=
github.com/mdlayher/socket v0.4.1 h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U=
github.com/mdlayher/socket v0.4.1/go.mod h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA=
github.com/mdlayher/socket v0.5.1 h1:VZaqt6RkGkt2OE9l3GcC6nZkqD3xKeQLyfleW/uBcos=
github.com/mdlayher/socket v0.5.1/go.mod h1:TjPLHI1UgwEv5J1B5q0zTZq12A/6H7nKmtTanQE37IQ=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
Expand Down Expand Up @@ -467,7 +467,6 @@ github.com/spf13/viper v1.10.1/go.mod h1:IGlFPqhNAPKRxohIzWpI5QEy4kuI7tcl5WvR+8q
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand All @@ -476,10 +475,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo=
Expand Down
79 changes: 41 additions & 38 deletions pkg/accesslog/collector/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/module"
"github.com/apache/skywalking-rover/pkg/process"
"github.com/apache/skywalking-rover/pkg/process/api"
"github.com/apache/skywalking-rover/pkg/tools"
"github.com/apache/skywalking-rover/pkg/tools/btf"
"github.com/apache/skywalking-rover/pkg/tools/enums"
Expand All @@ -56,7 +58,7 @@ func NewConnectionCollector() *ConnectCollector {
return &ConnectCollector{}
}

func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext) error {
func (c *ConnectCollector) Start(m *module.Manager, ctx *common.AccessLogContext) error {
perCPUBufferSize, err := units.RAMInBytes(ctx.Config.ConnectionAnalyze.PerCPUBufferSize)
if err != nil {
return err
Expand All @@ -73,13 +75,9 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext
if ctx.Config.ConnectionAnalyze.QueueSize < 1 {
return fmt.Errorf("the queue size be small than 1")
}
track, err := ip.NewConnTrack()
if err != nil {
connectionLogger.Warnf("cannot create the connection tracker, %v", err)
}
c.eventQueue = btf.NewEventQueue("connection resolver", ctx.Config.ConnectionAnalyze.AnalyzeParallels,
ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext {
return newConnectionPartitionContext(ctx, track)
return NewConnectionPartitionContext(ctx, m.FindModule(process.ModuleName).(process.K8sOperator))
})
c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize),
ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} {
Expand Down Expand Up @@ -130,18 +128,18 @@ func (c *ConnectCollector) Stop() {

type ConnectionPartitionContext struct {
context *common.AccessLogContext
connTracker *ip.ConnTrack
k8sOperator process.K8sOperator
}

func newConnectionPartitionContext(ctx *common.AccessLogContext, connTracker *ip.ConnTrack) *ConnectionPartitionContext {
func NewConnectionPartitionContext(ctx *common.AccessLogContext,
k8sOperator process.K8sOperator) *ConnectionPartitionContext {
return &ConnectionPartitionContext{
context: ctx,
connTracker: connTracker,
k8sOperator: k8sOperator,
}
}

func (c *ConnectionPartitionContext) Start(ctx context.Context) {

}

func (c *ConnectionPartitionContext) Consume(data interface{}) {
Expand All @@ -151,15 +149,14 @@ func (c *ConnectionPartitionContext) Consume(data interface{}) {
"pid: %d, fd: %d, role: %s: func: %s, family: %d, success: %d, conntrack exist: %t",
event.ConID, event.RandomID, event.PID, event.SocketFD, enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName),
event.SocketFamily, event.ConnectSuccess, event.ConnTrackUpstreamPort != 0)
socketPair := c.buildSocketFromConnectEvent(event)
socketPair := c.BuildSocketFromConnectEvent(event)
if socketPair == nil {
connectionLogger.Debugf("cannot found the socket paire from connect event, connection ID: %d, randomID: %d",
event.ConID, event.RandomID)
return
}
connectionLogger.Debugf("build socket pair success, connection ID: %d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
event.ConID, event.RandomID, socketPair.Role, socketPair.SrcIP, socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
c.context.ConnectionMgr.OnConnectEvent(event, socketPair)
forwarder.SendConnectEvent(c.context, event, socketPair)
case *events.SocketCloseEvent:
connectionLogger.Debugf("receive close event, connection ID: %d, randomID: %d, pid: %d, fd: %d",
Expand All @@ -169,7 +166,7 @@ func (c *ConnectionPartitionContext) Consume(data interface{}) {
}
}

func (c *ConnectionPartitionContext) fixSocketFamilyIfNeed(event *events.SocketConnectEvent, result *ip.SocketPair) {
func (c *ConnectionPartitionContext) FixSocketFamilyIfNeed(event *events.SocketConnectEvent, result *ip.SocketPair) {
if result == nil {
return
}
Expand All @@ -189,39 +186,39 @@ func (c *ConnectionPartitionContext) fixSocketFamilyIfNeed(event *events.SocketC
}
}

func (c *ConnectionPartitionContext) buildSocketFromConnectEvent(event *events.SocketConnectEvent) *ip.SocketPair {
func (c *ConnectionPartitionContext) BuildSocketFromConnectEvent(event *events.SocketConnectEvent) *ip.SocketPair {
if event.SocketFamily != unix.AF_INET && event.SocketFamily != unix.AF_INET6 && event.SocketFamily != enums.SocketFamilyUnknown {
// if not ipv4, ipv6 or unknown, ignore
return nil
}
socketPair := c.buildSocketPair(event)
if socketPair != nil && socketPair.IsValid() {
pair := c.BuildSocketPair(event)
if pair != nil && pair.IsValid() {
connectionLogger.Debugf("found the connection from the connect event is valid, connection ID: %d, randomID: %d",
event.ConID, event.RandomID)
return socketPair
return pair
}
// if only the local port not success, maybe the upstream port is not open, so it could be continued
if c.isOnlyLocalPortEmpty(socketPair) {
if c.IsOnlyLocalPortEmpty(pair) {
event.ConnectSuccess = 0
connectionLogger.Debugf("the connection from the connect event is only the local port is empty, connection ID: %d, randomID: %d",
event.ConID, event.RandomID)
return socketPair
return pair
}

pair, err := ip.ParseSocket(event.PID, event.SocketFD)
if err != nil {
connectionLogger.Debugf("cannot found the socket, pid: %d, socket FD: %d", event.PID, event.SocketFD)
connectionLogger.Debugf("cannot found the socket, pid: %d, socket FD: %d, error: %v", event.PID, event.SocketFD, err)
return nil
}
connectionLogger.Debugf("found the connection from the socket, connection ID: %d, randomID: %d",
event.ConID, event.RandomID)
pair.Role = enums.ConnectionRole(event.Role)
c.fixSocketFamilyIfNeed(event, pair)
c.tryToUpdateSocketFromConntrack(event, pair)
c.FixSocketFamilyIfNeed(event, pair)
c.CheckNeedConntrack(event, pair)
return pair
}

func (c *ConnectionPartitionContext) isOnlyLocalPortEmpty(socketPair *ip.SocketPair) bool {
func (c *ConnectionPartitionContext) IsOnlyLocalPortEmpty(socketPair *ip.SocketPair) bool {
if socketPair == nil {
return false
}
Expand All @@ -233,7 +230,7 @@ func (c *ConnectionPartitionContext) isOnlyLocalPortEmpty(socketPair *ip.SocketP
return socketPair.IsValid()
}

func (c *ConnectionPartitionContext) buildSocketPair(event *events.SocketConnectEvent) *ip.SocketPair {
func (c *ConnectionPartitionContext) BuildSocketPair(event *events.SocketConnectEvent) *ip.SocketPair {
var result *ip.SocketPair
haveConnTrack := false
if event.SocketFamily == unix.AF_INET {
Expand Down Expand Up @@ -288,22 +285,28 @@ func (c *ConnectionPartitionContext) buildSocketPair(event *events.SocketConnect
return result
}

c.fixSocketFamilyIfNeed(event, result)
c.tryToUpdateSocketFromConntrack(event, result)
c.FixSocketFamilyIfNeed(event, result)
c.CheckNeedConntrack(event, result)
return result
}

func (c *ConnectionPartitionContext) tryToUpdateSocketFromConntrack(event *events.SocketConnectEvent, socket *ip.SocketPair) {
if socket != nil && socket.IsValid() && c.connTracker != nil && !tools.IsLocalHostAddress(socket.DestIP) &&
event.FuncName != enums.SocketFunctionNameAccept { // accept event don't need to update the remote address
// if no contract and socket data is valid, then trying to get the remote address from the socket
// to encase the remote address is not the real remote address
originalIP := socket.DestIP
originalPort := socket.DestPort
if c.connTracker.UpdateRealPeerAddress(socket) {
connectionLogger.Debugf("update the socket address from conntrack success, "+
"connection ID: %d, randomID: %d, original remote: %s:%d, new remote: %s:%d",
event.ConID, event.RandomID, originalIP, originalPort, socket.DestIP, socket.DestPort)
}
func (c *ConnectionPartitionContext) CheckNeedConntrack(event *events.SocketConnectEvent, socket *ip.SocketPair) {
if socket == nil || !socket.IsValid() || tools.IsLocalHostAddress(socket.DestIP) ||
event.FuncName == enums.SocketFunctionNameAccept || // accept event don't need to update the remote address
!c.context.ConnectionMgr.ProcessIsDetectBy(event.PID, api.Kubernetes) { // only the k8s process need to update the remote address from conntrack
return
}

isPodIP, err := c.k8sOperator.IsPodIP(socket.DestIP)
if err != nil {
connectionLogger.Warnf("cannot found the pod IP, connection ID: %d, randomID: %d, error: %v",
event.ConID, event.RandomID, err)
}
if isPodIP {
connectionLogger.Debugf("detect the remote IP is pod IP, connection ID: %d, randomID: %d, remote: %s",
event.ConID, event.RandomID, socket.DestIP)
return
}
// update to the socket need to update the remote address from conntrack
socket.NeedConnTrack = true
}
1 change: 0 additions & 1 deletion pkg/accesslog/collector/protocols/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type PartitionConnection struct {
protocolAnalyzer map[enums.ConnectionProtocol]Protocol
protocolMetrics map[enums.ConnectionProtocol]ProtocolMetrics
closed bool
closeCallback common.ConnectionProcessFinishCallback
skipAllDataAnalyze bool
lastCheckCloseTime time.Time
}
Expand Down
Loading

0 comments on commit 935aece

Please sign in to comment.