Skip to content

Commit

Permalink
getparents code reorganization
Browse files Browse the repository at this point in the history
Deoptimizing GetParents() until we figure out how to do it without
leaking mem.
  • Loading branch information
gustavo-iniguez-goya committed Oct 3, 2023
1 parent 7f493e8 commit 19d376a
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 85 deletions.
2 changes: 2 additions & 0 deletions daemon/conman/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ func (c *Connection) String() string {

// Serialize returns a connection serialized.
func (c *Connection) Serialize() *protocol.Connection {
c.Process.RLock()
defer c.Process.RUnlock()
return &protocol.Connection{
Protocol: c.Protocol,
SrcIp: c.SrcIP.String(),
Expand Down
6 changes: 1 addition & 5 deletions daemon/procmon/activepids.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,12 @@ func MonitorProcEvents(stop <-chan struct{}) {
if _, needsUpdate, found := EventsCache.IsInStore(int(ev.PID), proc); found {
if needsUpdate {
EventsCache.ComputeChecksums(proc)
EventsCache.UpdateItemDetails(proc)
EventsCache.UpdateItem(proc)
}
log.Debug("[procmon exec event inCache] %d, pid:%d tgid:%d\n", ev.TimeStamp, ev.PID, ev.TGID)
continue
}
// adding item to cache in 2 steps:
// 1. with basic information, to have it readily available
// 2. getting the rest of the process details
EventsCache.Add(proc)
EventsCache.UpdateItemDetails(proc)
} else if ev.IsExit() {
p, _, found := EventsCache.IsInStore(int(ev.PID), nil)
if found && p.Proc.IsAlive() == false {
Expand Down
17 changes: 3 additions & 14 deletions daemon/procmon/cache_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ func (e *EventsStore) UpdateItem(proc *Process) {
if proc.Path == "" {
return
}
e.mu.Lock()
ev := &ExecEventItem{
Proc: proc,
LastSeen: time.Now().UnixNano(),
}
e.mu.Lock()
e.eventByPID[proc.ID] = ev
e.eventByPath[proc.Path] = ev
e.mu.Unlock()
Expand Down Expand Up @@ -185,15 +185,6 @@ func (e *EventsStore) DeleteOldItems() {
}
}

// UpdateItemDetails updates the details of a process
func (e *EventsStore) UpdateItemDetails(proc *Process) {
proc.GetParent()
proc.GetTree()
proc.ReadCwd()
proc.ReadEnv()
e.UpdateItem(proc)
}

// -------------------------------------------------------------------------
// TODO: Move to its own package.
// A hashing service than runs in background, and accepts paths to hash
Expand Down Expand Up @@ -247,17 +238,15 @@ func (e *EventsStore) ComputeChecksums(proc *Process) {
// pid found in cache
// we should check other parameters to see if the pid is really the same process
// proc/<pid>/maps
item.Proc.mu.RLock()
item.Proc.RLock()
checksumsNum := len(item.Proc.Checksums)
item.Proc.mu.RUnlock()
item.Proc.RUnlock()
if checksumsNum > 0 && (item.Proc.IsAlive() && item.Proc.Path == proc.Path) {
log.Debug("[cache] reuseChecksums() cached PID alive, already hashed: %v, %s new: %s", item.Proc.Checksums, item.Proc.Path, proc.Path)
proc.Checksums = item.Proc.Checksums
return
}
item.Proc.mu.RLock()
log.Debug("[cache] reuseChecksums() PID found inCache, computing hashes: %s new: %s - hashes: |%v<>%v|", item.Proc.Path, proc.Path, item.Proc.Checksums, proc.Checksums)
item.Proc.mu.RUnlock()

proc.ComputeChecksums(e.checksums)
}
Expand Down
43 changes: 12 additions & 31 deletions daemon/procmon/details.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ var socketsRegex, _ = regexp.Compile(`socket:\[([0-9]+)\]`)

// GetParent obtains the information of this process' parent.
func (p *Process) GetParent() {
p.mu.RLock()
hasParent := p.Parent != nil
p.mu.RUnlock()

if hasParent {
return
Expand All @@ -50,44 +48,29 @@ func (p *Process) GetParent() {
return
}

if item, found := EventsCache.IsInStoreByPID(ppid); found {
p.mu.Lock()
p.Parent = item.Proc
p.mu.Unlock()

EventsCache.UpdateItem(p)
} else {
p.mu.Lock()
p.Parent = NewProcessEmpty(ppid, "")
p.Parent.ReadPath()
p.mu.Unlock()
EventsCache.Add(p.Parent)
}
// TODO: see how we can reuse this object and the ppid, to save some iterations.
// right now it opens the can of leaks.
p.Parent = NewProcessEmpty(ppid, "")
p.Parent.ReadPath()

// get process tree
p.Parent.GetParent()
}

// GetTree returns all the parents of this process.
func (p *Process) GetTree() {
// BuildTree returns all the parents of this process.
func (p *Process) BuildTree() {
if len(p.Tree) > 0 {
fmt.Println("GetTree not empty:", p.Tree)
return
}
tree := make([]*protocol.StringInt, 0)
for pp := p.Parent; pp != nil; pp = pp.Parent {
// add the parents in reverse order, so when we iterate over them with the rules
// the first item is the most direct parent of the process.
pp.mu.RLock()
tree = append(tree,
p.Tree = append(p.Tree,
&protocol.StringInt{
Key: pp.Path, Value: uint32(pp.ID),
},
)
pp.mu.RUnlock()
}
p.mu.Lock()
p.Tree = tree
p.mu.Unlock()
}

// GetDetails collects information of a process.
Expand Down Expand Up @@ -153,9 +136,7 @@ func (p *Process) ReadCwd() error {
if err != nil {
return err
}
p.mu.Lock()
p.CWD = link
p.mu.Unlock()
return nil
}

Expand Down Expand Up @@ -458,9 +439,9 @@ func (p *Process) ComputeChecksum(algo string) {
log.Debug("[hashing] Unable to dump process memory: %s", err)
continue
}
p.mu.Lock()
p.Lock()
p.Checksums[algo] = hex.EncodeToString(h.Sum(code))
p.mu.Unlock()
p.Unlock()
log.Debug("[hashing] memory region hashed, elapsed: %v ,Hash: %s, %s\n", time.Since(start), p.Checksums[algo], paths[i])
code = nil
break
Expand All @@ -471,9 +452,9 @@ func (p *Process) ComputeChecksum(algo string) {
log.Debug("[hashing %s] Error copying data: %s", algo, err)
continue
}
p.mu.Lock()
p.Lock()
p.Checksums[algo] = hex.EncodeToString(h.Sum(nil))
p.mu.Unlock()
p.Unlock()
log.Debug("[hashing] elapsed: %v ,Hash: %s, %s\n", time.Since(start), p.Checksums[algo], paths[i])

break
Expand Down
75 changes: 43 additions & 32 deletions daemon/procmon/ebpf/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,40 +165,10 @@ func streamEventsWorker(id int, chn chan []byte, lost chan uint64, kernelEvents
} else {
switch event.Type {
case EV_TYPE_EXEC, EV_TYPE_EXECVEAT:
proc := event2process(&event)
if proc == nil {
continue
}
// TODO: store multiple executions with the same pid but different paths: forks, execves...
if item, needsUpdate, found := procmon.EventsCache.IsInStore(int(event.PID), proc); found {
if needsUpdate {
// when a process is replaced in memory, it'll be found in cache by PID,
// but the new process' details will be empty
proc.Parent = item.Proc
procmon.EventsCache.ComputeChecksums(proc)
procmon.EventsCache.UpdateItemDetails(proc)
}
log.Debug("[eBPF event inCache] -> %d, %v", event.PID, item.Proc.Checksums)
continue
}
// adding item to cache in 2 steps:
// 1. with basic information, to have it readily available
// 2. getting the rest of the process details that takes more time
procmon.EventsCache.Add(proc)
procmon.EventsCache.UpdateItemDetails(proc)
processExecEvent(&event)

case EV_TYPE_SCHED_EXIT:
log.Debug("[eBPF exit event] total: %d, pid: %d, ppid: %d", 0 /*execEvents.Len()*/, event.PID, event.PPID)
ev, _, found := procmon.EventsCache.IsInStore(int(event.PID), nil)
if !found {
continue
}
log.Debug("[eBPF exit event inCache] pid: %d, tgid: %d", event.PID, event.PPID)
if ev.Proc.IsAlive() == false {
procmon.EventsCache.Delete(int(event.PID))
log.Debug("[ebpf exit event] deleting DEAD pid: %d", event.PID)
}

processExitEvent(&event)
}
}
}
Expand All @@ -212,6 +182,9 @@ func event2process(event *execEvent) (proc *procmon.Process) {
proc = procmon.NewProcessEmpty(int(event.PID), byteArrayToString(event.Comm[:]))
proc.UID = int(event.UID)
// trust process path received from kernel
// NOTE: this is the absolute path executed, but no the real path to the binary.
// if it's executed from a chroot, the absolute path willa be /chroot/path/usr/bin/blabla
// if it's from a container, the absolute path will be /proc/<pid>/root/usr/bin/blabla
path := byteArrayToString(event.Filename[:])
if path != "" {
proc.SetPath(path)
Expand All @@ -228,7 +201,45 @@ func event2process(event *execEvent) (proc *procmon.Process) {
} else {
proc.ReadCmdline()
}
proc.GetParent()
proc.BuildTree()
proc.ReadCwd()
proc.ReadEnv()
log.Debug("[eBPF exec event] ppid: %d, pid: %d, %s -> %s", event.PPID, event.PID, proc.Path, proc.Args)

return
}

func processExecEvent(event *execEvent) {
proc := event2process(event)
if proc == nil {
return
}
// TODO: store multiple executions with the same pid but different paths:
// forks, execves... execs from chroots, containers, etc.
if item, needsUpdate, found := procmon.EventsCache.IsInStore(int(event.PID), proc); found {
if needsUpdate {
// when a process is replaced in memory, it'll be found in cache by PID,
// but the new process's details will be empty
proc.Parent = item.Proc
procmon.EventsCache.ComputeChecksums(proc)
procmon.EventsCache.UpdateItem(proc)
}
log.Debug("[eBPF event inCache] -> %d, %v", event.PID, item.Proc.Checksums)
return
}
procmon.EventsCache.Add(proc)
}

func processExitEvent(event *execEvent) {
log.Debug("[eBPF exit event] pid: %d, ppid: %d", event.PID, event.PPID)
ev, _, found := procmon.EventsCache.IsInStore(int(event.PID), nil)
if !found {
return
}
log.Debug("[eBPF exit event inCache] pid: %d, tgid: %d", event.PID, event.PPID)
if ev.Proc.IsAlive() == false {
procmon.EventsCache.Delete(int(event.PID))
log.Debug("[ebpf exit event] deleting DEAD pid: %d", event.PID)
}
}
22 changes: 21 additions & 1 deletion daemon/procmon/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func NewProcess(pid int, comm string) *Process {
}
p.GetDetails()
p.GetParent()
p.GetTree()
p.BuildTree()

return p
}
Expand All @@ -176,6 +176,26 @@ func NewProcessWithParent(pid, ppid int, comm string) *Process {
return p
}

// Lock locks this process for w+r
func (p *Process) Lock() {
p.mu.Lock()
}

// Unlock unlocks reading from this process
func (p *Process) Unlock() {
p.mu.Unlock()
}

// RLock locks this process for r
func (p *Process) RLock() {
p.mu.RLock()
}

// RUnlock unlocks reading from this process
func (p *Process) RUnlock() {
p.mu.RUnlock()
}

//Serialize transforms a Process object to gRPC protocol object
func (p *Process) Serialize() *protocol.Process {
ioStats := p.IOStats
Expand Down
7 changes: 6 additions & 1 deletion daemon/rule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,14 @@ func (o *Operator) Match(con *conman.Connection, hasChecksums bool) bool {
if !hasChecksums {
return ret
}
con.Process.RLock()
for algo := range con.Process.Checksums {
return o.cb(con.Process.Checksums[algo])
ret = o.cb(con.Process.Checksums[algo])
if ret {
break
}
}
con.Process.RUnlock()
return ret
} else if o.Operand == OpProto {
return o.cb(con.Protocol)
Expand Down
2 changes: 1 addition & 1 deletion daemon/ui/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (c *Client) monitorProcessDetails(pid int, stream protocol.UI_Notifications
p = &newProc
if len(p.Tree) == 0 {
p.GetParent()
p.GetTree()
p.BuildTree()
}
} else {
p = procmon.NewProcess(pid, "")
Expand Down

0 comments on commit 19d376a

Please sign in to comment.