@@ -44,15 +44,15 @@ import (
44
44
"golang.org/x/sys/unix"
45
45
)
46
46
47
- var connectLogger = logger .GetLogger ("access_log" , "collector" , "connect " )
47
+ var connectionLogger = logger .GetLogger ("access_log" , "collector" , "connection " )
48
48
49
- var connectCollectInstance = NewConnectCollector ()
49
+ var connectionCollectInstance = NewConnectionCollector ()
50
50
51
51
type ConnectCollector struct {
52
52
eventQueue * btf.EventQueue
53
53
}
54
54
55
- func NewConnectCollector () * ConnectCollector {
55
+ func NewConnectionCollector () * ConnectCollector {
56
56
return & ConnectCollector {}
57
57
}
58
58
@@ -72,7 +72,7 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext
72
72
}
73
73
track , err := ip .NewConnTrack ()
74
74
if err != nil {
75
- connectLogger .Warnf ("cannot create the connection tracker, %v" , err )
75
+ connectionLogger .Warnf ("cannot create the connection tracker, %v" , err )
76
76
}
77
77
c .eventQueue = btf .NewEventQueue (ctx .Config .ConnectionAnalyze .Parallels , ctx .Config .ConnectionAnalyze .QueueSize , func (num int ) btf.PartitionContext {
78
78
return newConnectionPartitionContext (ctx , track )
@@ -82,6 +82,11 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext
82
82
}, func (data interface {}) string {
83
83
return fmt .Sprintf ("%d" , data .(* events.SocketConnectEvent ).ConID )
84
84
})
85
+ c .eventQueue .RegisterReceiver (ctx .BPF .SocketCloseEventQueue , int (perCPUBufferSize ), func () interface {} {
86
+ return & events.SocketCloseEvent {}
87
+ }, func (data interface {}) string {
88
+ return fmt .Sprintf ("%d" , data .(* events.SocketCloseEvent ).ConnectionID )
89
+ })
85
90
c .eventQueue .Start (ctx .RuntimeContext , ctx .BPF .Linker )
86
91
87
92
ctx .BPF .AddTracePoint ("syscalls" , "sys_enter_connect" , ctx .BPF .TracepointEnterConnect )
@@ -90,6 +95,8 @@ func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext
90
95
ctx .BPF .AddTracePoint ("syscalls" , "sys_exit_accept" , ctx .BPF .TracepointExitAccept )
91
96
ctx .BPF .AddTracePoint ("syscalls" , "sys_enter_accept4" , ctx .BPF .TracepointEnterAccept )
92
97
ctx .BPF .AddTracePoint ("syscalls" , "sys_exit_accept4" , ctx .BPF .TracepointExitAccept )
98
+ ctx .BPF .AddTracePoint ("syscalls" , "sys_enter_close" , ctx .BPF .TracepointEnterClose )
99
+ ctx .BPF .AddTracePoint ("syscalls" , "sys_exit_close" , ctx .BPF .TracepointExitClose )
93
100
94
101
ctx .BPF .AddLink (link .Kprobe , map [string ]* ebpf.Program {
95
102
"tcp_connect" : ctx .BPF .TcpConnect ,
@@ -133,21 +140,28 @@ func (c *ConnectionPartitionContext) Start(ctx context.Context) {
133
140
}
134
141
135
142
func (c * ConnectionPartitionContext ) Consume (data interface {}) {
136
- event := data .(* events.SocketConnectEvent )
137
- connectLogger .Debugf ("receive connect event, connection ID: %d, randomID: %d, " +
138
- "pid: %d, fd: %d, role: %s: func: %s, family: %d, success: %d, conntrack exist: %t" ,
139
- event .ConID , event .RandomID , event .PID , event .SocketFD , enums .ConnectionRole (event .Role ), enums .SocketFunctionName (event .FuncName ),
140
- event .SocketFamily , event .ConnectSuccess , event .ConnTrackUpstreamPort != 0 )
141
- socketPair := c .buildSocketFromConnectEvent (event )
142
- if socketPair == nil {
143
- connectLogger .Debugf ("cannot found the socket paire from connect event, connection ID: %d, randomID: %d" ,
144
- event .ConID , event .RandomID )
145
- return
143
+ switch event := data .(type ) {
144
+ case * events.SocketConnectEvent :
145
+ connectionLogger .Debugf ("receive connect event, connection ID: %d, randomID: %d, " +
146
+ "pid: %d, fd: %d, role: %s: func: %s, family: %d, success: %d, conntrack exist: %t" ,
147
+ event .ConID , event .RandomID , event .PID , event .SocketFD , enums .ConnectionRole (event .Role ), enums .SocketFunctionName (event .FuncName ),
148
+ event .SocketFamily , event .ConnectSuccess , event .ConnTrackUpstreamPort != 0 )
149
+ socketPair := c .buildSocketFromConnectEvent (event )
150
+ if socketPair == nil {
151
+ connectionLogger .Debugf ("cannot found the socket paire from connect event, connection ID: %d, randomID: %d" ,
152
+ event .ConID , event .RandomID )
153
+ return
154
+ }
155
+ connectionLogger .Debugf ("build socket pair success, connection ID: %d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d" ,
156
+ event .ConID , event .RandomID , socketPair .Role , socketPair .SrcIP , socketPair .SrcPort , socketPair .DestIP , socketPair .DestPort )
157
+ c .context .ConnectionMgr .OnConnectEvent (event , socketPair )
158
+ forwarder .SendConnectEvent (c .context , event , socketPair )
159
+ case * events.SocketCloseEvent :
160
+ connectionLogger .Debugf ("receive close event, connection ID: %d, randomID: %d, pid: %d, fd: %d" ,
161
+ event .ConnectionID , event .RandomID , event .PID , event .SocketFD )
162
+ wapperedEvent := c .context .ConnectionMgr .OnConnectionClose (event )
163
+ forwarder .SendCloseEvent (c .context , wapperedEvent )
146
164
}
147
- connectLogger .Debugf ("build socket pair success, connection ID: %d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d" ,
148
- event .ConID , event .RandomID , socketPair .Role , socketPair .SrcIP , socketPair .SrcPort , socketPair .DestIP , socketPair .DestPort )
149
- c .context .ConnectionMgr .OnConnectEvent (event , socketPair )
150
- forwarder .SendConnectEvent (c .context , event , socketPair )
151
165
}
152
166
153
167
func (c * ConnectionPartitionContext ) fixSocketFamilyIfNeed (event * events.SocketConnectEvent , result * ip.SocketPair ) {
@@ -163,7 +177,7 @@ func (c *ConnectionPartitionContext) fixSocketFamilyIfNeed(event *events.SocketC
163
177
}
164
178
165
179
if result .Family != actual {
166
- connectLogger .Debugf ("fix the socket family from %d to %d, connection ID: %d, randomID: %d" ,
180
+ connectionLogger .Debugf ("fix the socket family from %d to %d, connection ID: %d, randomID: %d" ,
167
181
result .Family , actual , event .ConID , event .RandomID )
168
182
result .Family = actual
169
183
}
@@ -177,24 +191,24 @@ func (c *ConnectionPartitionContext) buildSocketFromConnectEvent(event *events.S
177
191
}
178
192
socketPair := c .buildSocketPair (event )
179
193
if socketPair != nil && socketPair .IsValid () {
180
- connectLogger .Debugf ("found the connection from the connect event is valid, connection ID: %d, randomID: %d" ,
194
+ connectionLogger .Debugf ("found the connection from the connect event is valid, connection ID: %d, randomID: %d" ,
181
195
event .ConID , event .RandomID )
182
196
return socketPair
183
197
}
184
198
// if only the local port not success, maybe the upstream port is not open, so it could be continued
185
199
if c .isOnlyLocalPortEmpty (socketPair ) {
186
200
event .ConnectSuccess = 0
187
- connectLogger .Debugf ("the connection from the connect event is only the local port is empty, connection ID: %d, randomID: %d" ,
201
+ connectionLogger .Debugf ("the connection from the connect event is only the local port is empty, connection ID: %d, randomID: %d" ,
188
202
event .ConID , event .RandomID )
189
203
return socketPair
190
204
}
191
205
192
206
pair , err := ip .ParseSocket (event .PID , event .SocketFD )
193
207
if err != nil {
194
- connectLogger .Debugf ("cannot found the socket, pid: %d, socket FD: %d" , event .PID , event .SocketFD )
208
+ connectionLogger .Debugf ("cannot found the socket, pid: %d, socket FD: %d" , event .PID , event .SocketFD )
195
209
return nil
196
210
}
197
- connectLogger .Debugf ("found the connection from the socket, connection ID: %d, randomID: %d" ,
211
+ connectionLogger .Debugf ("found the connection from the socket, connection ID: %d, randomID: %d" ,
198
212
event .ConID , event .RandomID )
199
213
pair .Role = enums .ConnectionRole (event .Role )
200
214
c .fixSocketFamilyIfNeed (event , pair )
@@ -229,8 +243,8 @@ func (c *ConnectionPartitionContext) buildSocketPair(event *events.SocketConnect
229
243
result .DestIP = ip .ParseIPV4 (uint32 (event .ConnTrackUpstreamIPl ))
230
244
result .DestPort = uint16 (event .ConnTrackUpstreamPort )
231
245
232
- if connectLogger .Enable (logrus .DebugLevel ) {
233
- connectLogger .Debugf ("found the connection from the conntrack, connection ID: %d, randomID: %d, original: %s:%d, conntrack: %s:%d" ,
246
+ if connectionLogger .Enable (logrus .DebugLevel ) {
247
+ connectionLogger .Debugf ("found the connection from the conntrack, connection ID: %d, randomID: %d, original: %s:%d, conntrack: %s:%d" ,
234
248
event .ConID , event .RandomID , ip .ParseIPV4 (event .RemoteAddrV4 ), uint16 (event .RemoteAddrPort ), result .DestIP , result .DestPort )
235
249
}
236
250
} else {
@@ -255,8 +269,8 @@ func (c *ConnectionPartitionContext) buildSocketPair(event *events.SocketConnect
255
269
result .DestIP = ip .ParseIPV4 (uint32 (event .ConnTrackUpstreamIPl ))
256
270
}
257
271
result .DestPort = uint16 (event .ConnTrackUpstreamPort )
258
- if connectLogger .Enable (logrus .DebugLevel ) {
259
- connectLogger .Debugf ("found the connection from the conntrack, connection ID: %d, randomID: %d, original: %s:%d, conntrack: %s:%d" ,
272
+ if connectionLogger .Enable (logrus .DebugLevel ) {
273
+ connectionLogger .Debugf ("found the connection from the conntrack, connection ID: %d, randomID: %d, original: %s:%d, conntrack: %s:%d" ,
260
274
event .ConID , event .RandomID , ip .ParseIPV6 (event .RemoteAddrV6 ), uint16 (event .RemoteAddrPort ), result .DestIP , result .DestPort )
261
275
}
262
276
} else {
@@ -282,7 +296,7 @@ func (c *ConnectionPartitionContext) tryToUpdateSocketFromConntrack(event *event
282
296
originalIP := socket .DestIP
283
297
originalPort := socket .DestPort
284
298
if c .connTracker .UpdateRealPeerAddress (socket ) {
285
- connectLogger .Debugf ("update the socket address from conntrack success, " +
299
+ connectionLogger .Debugf ("update the socket address from conntrack success, " +
286
300
"connection ID: %d, randomID: %d, original remote: %s:%d, new remote: %s:%d" ,
287
301
event .ConID , event .RandomID , originalIP , originalPort , socket .DestIP , socket .DestPort )
288
302
}
0 commit comments