Skip to content

Commit

Permalink
Fix some NPE and update logs in the Access Log Module
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu committed Jan 18, 2024
1 parent f20f8bc commit e254e83
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 17 deletions.
2 changes: 2 additions & 0 deletions bpf/accesslog/syscalls/transfer.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ static __always_inline void process_write_data(void *ctx, __u64 id, struct sock_
if (ssl_data_args != NULL && ssl_data_args->fd == 0) {
ssl_data_args->fd = args->fd;
conn->ssl = true;
} else if (ssl) {
conn->ssl = true;
}

// if the cannot getting the package size and count, then try to get it from the data args
Expand Down
3 changes: 3 additions & 0 deletions bpf/accesslog/tls/tls.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include "socket_opts.h"

static __always_inline void set_conn_as_ssl(struct pt_regs* ctx, __u32 tgid, __u32 fd, __u32 func_name) {
if (fd <= 0) {
return;
}
struct active_connection_t* conn = get_or_create_active_conn(ctx, tgid, fd, func_name, CONNECTION_ROLE_TYPE_UNKNOWN);
if (conn == NULL) {
return;
Expand Down
3 changes: 3 additions & 0 deletions configs/rover_configs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ profiling:
access_log:
# Is active the access log monitoring
active: ${ROVER_ACCESS_LOG_ACTIVE:false}
# Exclude processes in the specified Kubernetes namespace. Multiple namespaces split by ","
exclude_namespaces: ${ROVER_ACCESS_LOG_EXCLUDE_NAMESPACES:istio-system,cert-manager,kube-system}
# Exclude processes in the specified cluster which defined in the process module. Multiple clusters split by ","
exclude_cluster: ${ROVER_ACCESS_LOG_EXCLUDE_CLUSTER:}
flush:
# The max count of access log when flush to the backend
max_count: ${ROVER_ACCESS_LOG_FLUSH_MAX_COUNT:10000}
Expand Down
18 changes: 10 additions & 8 deletions docs/en/setup/configuration/accesslog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ and send [access logs](https://github.com/apache/skywalking-data-collect-protoco

## Configuration

| Name | Default | Environment Key | Description |
|--------------------------------------------|---------|--------------------------------------------------|------------------------------------------------------------|
| access_log.active | false | ROVER_ACCESS_LOG_ACTIVE | Is active the access log monitoring. |
| access_log.flush.max_count | 2000 | ROVER_ACCESS_LOG_FLUSH_MAX_COUNT | The max count of the access log when flush to the backend. |
| access_log.flush.period | 5s | ROVER_ACCESS_LOG_FLUSH_PERIOD | The period of flush access log to the backend. |
| access_log_protocol_analyze.per_cpu_buffer | 400KB | ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PER_CPU_BUFFER | The size of socket data buffer on each CPU. |
| access_log.protocol_analyze.parallels | 2 | ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PARALLELS | The count of parallel protocol analyzer. |
| access_log.protocol_analyze.queue_size | 5000 | ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_QUEUE_SIZE | The size of per paralleled analyze queue. |
| Name | Default | Environment Key | Description |
|--------------------------------------------|---------------------------------------|--------------------------------------------------|----------------------------------------------------------------------------------------------------------------|
| access_log.active | false | ROVER_ACCESS_LOG_ACTIVE | Is active the access log monitoring. |
| access_log.exclude_namespaces | istio-system,cert-manager,kube-system | ROVER_ACCESS_LOG_EXCLUDE_NAMESPACES | Exclude processes in the specified Kubernetes namespace. Multiple namespaces split by "," |
| access_log.exclude_cluster | | ROVER_ACCESS_LOG_EXCLUDE_CLUSTER | Exclude processes in the specified cluster which defined in the process module. Multiple clusters split by "," |
| access_log.flush.max_count | 2000 | ROVER_ACCESS_LOG_FLUSH_MAX_COUNT | The max count of the access log when flush to the backend. |
| access_log.flush.period | 5s | ROVER_ACCESS_LOG_FLUSH_PERIOD | The period of flush access log to the backend. |
| access_log_protocol_analyze.per_cpu_buffer | 400KB | ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PER_CPU_BUFFER | The size of socket data buffer on each CPU. |
| access_log.protocol_analyze.parallels | 2 | ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PARALLELS | The count of parallel protocol analyzer. |
| access_log.protocol_analyze.queue_size | 5000 | ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_QUEUE_SIZE | The size of per paralleled analyze queue. |


## Collectors
Expand Down
4 changes: 2 additions & 2 deletions pkg/accesslog/collector/protocols/http1.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func (p *HTTP1Protocol) GenerateConnection(connectionID, randomID uint64) Protoc

func (p *HTTP1Protocol) Analyze(metrics ProtocolMetrics, buf *buffer.Buffer, _ *AnalyzeHelper) error {
http1Metrics := metrics.(*HTTP1Metrics)
http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: %d, random ID: %d",
http1Metrics.connectionID, http1Metrics.randomID)
http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: %d, random ID: %d, data len: %d",
http1Metrics.connectionID, http1Metrics.randomID, buf.DataLength())
buf.ResetForLoopReading()
for {
if !buf.PrepareForReading() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/accesslog/collector/protocols/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Protocol interface {
}

func appendSocketDetailsFromBuffer(result []*events.SocketDetailEvent, buf *buffer.Buffer) []*events.SocketDetailEvent {
if buf.DetailLength() == 0 {
if buf == nil || buf.DetailLength() == 0 {
return result
}
for e := buf.Details().Front(); e != nil; e = e.Next() {
Expand Down
11 changes: 7 additions & 4 deletions pkg/accesslog/collector/protocols/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ func (p *PartitionContext) Consume(data interface{}) {
case *events.SocketDetailEvent:
pid, _ := events.ParseConnectionID(event.ConnectionID)
log.Debugf("receive the socket detail event, connection ID: %d, random ID: %d, pid: %d, data id: %d, "+
"function name: %s, package count: %d, package size: %d, l4 duration: %d",
"function name: %s, package count: %d, package size: %d, l4 duration: %d, ssl: %d",
event.ConnectionID, event.RandomID, pid, event.DataID0, event.FunctionName,
event.L4PackageCount, event.L4TotalPackageSize, event.L4Duration)
event.L4PackageCount, event.L4TotalPackageSize, event.L4Duration, event.SSL)
if event.Protocol == enums.ConnectionProtocolUnknown {
// if the connection protocol is unknown, we just needs to add this into the kernel log
forwarder.SendTransferNoProtocolEvent(p.context, event)
Expand Down Expand Up @@ -244,12 +244,14 @@ func (p *PartitionContext) checkTheConnectionIsAlreadyClose(con *PartitionConnec
if err := p.context.BPF.ActiveConnectionMap.Lookup(con.connectionID, &activateConn); err != nil {
if errors.Is(err, ebpf.ErrKeyNotExist) {
con.closed = true
log.Debugf("detect the connection: %d-%d is already closed(by key not exist), so remove from the activate connection",
con.connectionID, con.randomID)
return
}
log.Warnf("cannot found the active connection: %d-%d, err: %v", con.connectionID, con.randomID, err)
return
} else if activateConn.RandomID != 0 && activateConn.RandomID != con.randomID {
log.Debugf("detect the connection: %d-%d is already closed, so remove from the activate connection",
log.Debugf("detect the connection: %d-%d is already closed(by difference random ID), so remove from the activate connection",
con.connectionID, con.randomID)
con.closed = true
}
Expand All @@ -267,7 +269,8 @@ func (p *PartitionContext) processExpireEvents() {

func (p *PartitionContext) processConnectionExpireEvents(connection *PartitionConnection) {
if c := connection.dataBuffer.DeleteExpireEvents(maxBufferExpireDuration); c > 0 {
log.Debugf("total removed %d expired socket data events", c)
log.Debugf("total removed %d expired socket data events from connection ID: %d, random ID: %d", c,
connection.connectionID, connection.randomID)
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/accesslog/collector/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func (t *TransferCollector) Start(_ *module.Manager, context *common.AccessLogCo
context.BPF.AddTracePoint("syscalls", "sys_exit_write", context.BPF.TracepointExitWrite)
context.BPF.AddTracePoint("syscalls", "sys_enter_read", context.BPF.TracepointEnterRead)
context.BPF.AddTracePoint("syscalls", "sys_exit_read", context.BPF.TracepointExitRead)
context.BPF.AddTracePoint("syscalls", "sys_enter_readv", context.BPF.TracepointEnterReadv)
context.BPF.AddTracePoint("syscalls", "sys_exit_readv", context.BPF.TracepointExitReadv)
context.BPF.AddTracePoint("syscalls", "sys_enter_sendto", context.BPF.TracepointEnterSendto)
context.BPF.AddTracePoint("syscalls", "sys_exit_sendto", context.BPF.TracepointExitSendto)
context.BPF.AddTracePoint("syscalls", "sys_enter_writev", context.BPF.TracepointEnterWritev)
Expand Down
1 change: 1 addition & 0 deletions pkg/accesslog/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Config struct {

Active bool `mapstructure:"active"`
ExcludeNamespaces string `mapstructure:"exclude_namespaces"`
ExcludeClusters string `mapstructure:"exclude_cluster"`
Flush FlushConfig `mapstructure:"flush"`
ProtocolAnalyze ProtocolAnalyzeConfig `mapstructure:"protocol_analyze"`
}
Expand Down
23 changes: 21 additions & 2 deletions pkg/accesslog/common/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type ConnectionManager struct {
activeConnectionMap *ebpf.Map

excludeNamespaces map[string]bool
excludeClusters map[string]bool

processors []ConnectionProcessor
processListeners []ProcessListener
Expand Down Expand Up @@ -135,6 +136,10 @@ func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader *
for _, ns := range strings.Split(config.ExcludeNamespaces, ",") {
excludeNamespaces[ns] = true
}
excludeClusters := make(map[string]bool)
for _, cluster := range strings.Split(config.ExcludeClusters, ",") {
excludeClusters[cluster] = true
}
mgr := &ConnectionManager{
moduleMgr: moduleMgr,
processOP: moduleMgr.FindModule(process.ModuleName).(process.Operator),
Expand All @@ -147,6 +152,7 @@ func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader *
activeConnectionMap: bpfLoader.ActiveConnectionMap,
allUnfinishedConnections: make(map[string]*bool),
excludeNamespaces: excludeNamespaces,
excludeClusters: excludeClusters,
}
return mgr
}
Expand Down Expand Up @@ -295,6 +301,9 @@ func (c *ConnectionManager) buildAddressFromLocalKubernetesProcess(pid uint32, p
for _, pi := range c.monitoringProcesses[int32(pid)] {
if pi.DetectType() == api.Kubernetes {
entity := pi.Entity()
if cluster, _, found := strings.Cut(entity.ServiceName, "::"); found && c.excludeClusters[cluster] {
continue
}
podContainer := pi.DetectProcess().(*kubernetes.Process).PodContainer()
return &v3.ConnectionAddress{
Address: &v3.ConnectionAddress_Kubernetes{
Expand Down Expand Up @@ -455,15 +464,25 @@ func (c *ConnectionManager) printTotalAddressesWithPid(prefix string) {
}

func (c *ConnectionManager) shouldExcludeTheProcess(entities []api.ProcessInterface) bool {
// when the process contains multiple entity, and contains the cluster not exclude, then should not exclude the process
containsNotExcludeCluster := false
for _, entity := range entities {
if entity.DetectType() == api.Kubernetes {
if entity.DetectType() == api.Kubernetes { // for now, we only have the kubernetes detected process
namespace := entity.DetectProcess().(*kubernetes.Process).PodContainer().Pod.Namespace
if c.excludeNamespaces[namespace] {
return true
}
if cluster, _, found := strings.Cut(entity.Entity().ServiceName, "::"); found {
if !c.excludeClusters[cluster] {
containsNotExcludeCluster = true
}
} else {
containsNotExcludeCluster = true
break
}
}
}
return false
return !containsNotExcludeCluster
}

func (c *ConnectionManager) RemoveProcess(pid int32, entities []api.ProcessInterface) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/tools/btf/linker.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ func (u *UProbeExeFile) AddLinkWithType(symbol string, enter bool, p *ebpf.Progr
if !u.found {
return
}
if p == nil {
return
}
lk, err := u.addLinkWithType0(symbol, enter, p, 0)
if err != nil {
u.linker.errors = multierror.Append(u.linker.errors, fmt.Errorf("file: %s, symbol: %s, type: %s, error: %v",
Expand Down
6 changes: 6 additions & 0 deletions pkg/tools/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ func (r *Buffer) Slice(validated bool, start, end *Position) *Buffer {
detailEvents := list.New()
var firstDetailElement *list.Element
for nextElement := start.element; nextElement != end.element; nextElement = nextElement.Next() {
if nextElement == nil {
break
}
// found first matches detail event
if detailEvents.Len() == 0 || firstDetailElement == nil {
for e := r.detailEvents.Front(); e != nil; e = e.Next() {
Expand Down Expand Up @@ -215,6 +218,9 @@ func (r *Buffer) Details() *list.List {
}

func (r *Buffer) DataSize() int64 {
if r == nil {
return 0
}
var result int64
var headPosition = r.head
if headPosition == nil {
Expand Down

0 comments on commit e254e83

Please sign in to comment.