Skip to content

Commit

Permalink
procmon/cache improvements
Browse files Browse the repository at this point in the history
 - Fixed several leaks.
 - Cache of events reorganized and improved.
   * items are added faster.
   * proc details are rebuilt if needed (checksums, proc tree, etc)
   * proc's tree is reused if we've got the parent in cache.

rel: #413
  • Loading branch information
gustavo-iniguez-goya committed Dec 12, 2023
1 parent 9efaa37 commit 431e2d3
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 205 deletions.
5 changes: 2 additions & 3 deletions daemon/procmon/activepids.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ func MonitorProcEvents(stop <-chan struct{}) {
proc := NewProcessWithParent(int(ev.PID), int(ev.TGID), "")

log.Debug("[procmon exec event] %d, pid:%d tgid:%d %s, %s -> %s\n", ev.TimeStamp, ev.PID, ev.TGID, proc.Comm, proc.Path, proc.Parent.Path)
if _, needsUpdate, found := EventsCache.IsInStore(int(ev.PID), proc); found {
if item, needsUpdate, found := EventsCache.IsInStore(int(ev.PID), proc); found {
if needsUpdate {
EventsCache.ComputeChecksums(proc)
EventsCache.UpdateItem(proc)
EventsCache.Update(&item.Proc, proc)
}
log.Debug("[procmon exec event inCache] %d, pid:%d tgid:%d\n", ev.TimeStamp, ev.PID, ev.TGID)
continue
Expand Down
280 changes: 156 additions & 124 deletions daemon/procmon/cache_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ var (
// EventsCache is the cache of processes
EventsCache *EventsStore
eventsCacheTicker *time.Ticker

// When we receive an Exit event, we'll delete it from cache.
// This TTL defines how much time we retain a PID on cache, before we receive
// an Exit event.
pidTTL = 3600 // seconds
// the 2nd cache of items is by path.
//
pathTTL = 3600 * 24 // 1 day
pidTTL = 20 // seconds
)

func init() {
Expand All @@ -37,8 +35,8 @@ type ProcessEvent struct {

// ExecEventItem represents an item of the cache
type ExecEventItem struct {
sync.RWMutex
Proc *Process
//sync.RWMutex
Proc Process
LastSeen int64
TTL int32
}
Expand All @@ -52,9 +50,7 @@ func (e *ExecEventItem) isValid() bool {

//EventsStore is the cache of exec events
type EventsStore struct {
eventByPID map[int]*ExecEventItem
// a path will have multiple pids, hashes will be computed only once by path
eventByPath map[string]*ExecEventItem
eventByPID map[int]ExecEventItem
checksums map[string]uint
mu *sync.RWMutex
checksumsEnabled bool
Expand All @@ -68,10 +64,9 @@ func NewEventsStore() *EventsStore {
eventsCacheTicker = time.NewTicker(10 * time.Second)

return &EventsStore{
mu: &sync.RWMutex{},
checksums: make(map[string]uint, 500),
eventByPID: make(map[int]*ExecEventItem, 500),
eventByPath: make(map[string]*ExecEventItem, 500),
mu: &sync.RWMutex{},
checksums: make(map[string]uint, 500),
eventByPID: make(map[int]ExecEventItem, 500),
}
}

Expand All @@ -80,82 +75,161 @@ func NewEventsStore() *EventsStore {
// or reused existing ones otherwise.
func (e *EventsStore) Add(proc *Process) {
log.Debug("[cache] EventsStore.Add() %d, %s", proc.ID, proc.Path)
// add the item to cache ASAP
// Add the item to cache ASAP,
// then calculate the checksums if needed.
e.UpdateItem(proc)
if e.GetComputeChecksums() {
e.ComputeChecksums(proc)
e.UpdateItem(proc)
if e.ComputeChecksums(proc) {
e.UpdateItem(proc)
}
}
log.Debug("[cache] EventsStore.Add() finished")
}

// UpdateItem updates a cache item
func (e *EventsStore) UpdateItem(proc *Process) {
log.Debug("[cache] updateItem() adding to events store (total: %d), pid: %d, paths: %s", e.Len(), proc.ID, proc.Path)
log.Debug("[cache] updateItem() updating events store (total: %d), pid: %d, path: %s", e.Len(), proc.ID, proc.Path)
if proc.Path == "" {
return
}
e.mu.Lock()
ev := &ExecEventItem{
Proc: proc,
ev := ExecEventItem{
Proc: *proc,
LastSeen: time.Now().UnixNano(),
}
e.eventByPID[proc.ID] = ev
e.eventByPath[proc.Path] = ev
e.mu.Unlock()
}

// IsInStore checks if a PID is in the store.
// If the PID is in cache, we may need to update it if the PID
// is reusing the PID of the parent.
func (e *EventsStore) IsInStore(key int, proc *Process) (item *ExecEventItem, needsUpdate bool, found bool) {
item, found = e.IsInStoreByPID(key)
if !found {
// ReplaceItem replaces an existing process with a new one.
func (e *EventsStore) ReplaceItem(oldProc, newProc *Process) {
log.Debug("[event inCache, replacement] new: %d, %s -> inCache: %d -> %s", newProc.ID, newProc.Path, oldProc.ID, oldProc.Path)
// Note: in rare occasions, the process being replaced is the older one.
// if oldProc.Starttime > newProc.Starttime {}
//
newProc.PPID = oldProc.ID
e.UpdateItem(newProc)

if newProc.ChecksumsCount() == 0 {
e.ComputeChecksums(newProc)
e.UpdateItem(newProc)
}

if len(oldProc.Tree) == 0 {
oldProc.GetParent()
oldProc.BuildTree()
e.UpdateItem(newProc)
}

// TODO: work on improving the process tree (specially with forks/clones*)
if len(newProc.Tree) == 0 {
newProc.Parent = oldProc
newProc.BuildTree()
e.UpdateItem(newProc)
}
}

// Update ...
func (e *EventsStore) Update(oldProc, proc *Process) {
log.Debug("[cache Update old] %d in cache -> %s", oldProc.ID, oldProc.Path)

update := false
updateOld := false

// forked process. Update cache.
// execEvent -> pid: 12345, /usr/bin/exec-wrapper
// execEvent -> pid: 12345, /usr/bin/telnet
if proc != nil && (proc.ID == oldProc.ID && proc.Path != oldProc.Path) {
e.ReplaceItem(oldProc, proc)
return
}
log.Debug("[cache] Event found by PID: %d, %s", key, item.Proc.Path)

if len(oldProc.Tree) == 0 {
oldProc.GetParent()
oldProc.BuildTree()
updateOld = true
}

if proc != nil && (len(oldProc.Tree) > 0 && len(proc.Tree) == 0 && oldProc.ID == proc.ID) {
proc.Tree = oldProc.Tree
update = true
}

if updateOld {
log.Debug("[cache] Update end, updating oldProc: %d, %s, %v", oldProc.ID, oldProc.Path, oldProc.Tree)
e.UpdateItem(oldProc)
}
if update {
log.Debug("[cache] Update end, updating newProc: %d, %s, %v", proc.ID, proc.Path, proc.Tree)
e.UpdateItem(proc)
}
}

func (e *EventsStore) needsUpdate(cachedProc, proc *Process) bool {
cachedProc.RLock()
defer cachedProc.RUnlock()

// check if this PID has replaced the PPID:
// systemd, pid:1234 -> curl, pid:1234 -> curl (i.e.: pid 1234) opens x.x.x.x:443
// Without this, we would display for example "systemd is connecting to x.x.x.x:443",
// instead of "curl is connecting to ..."
// The previous pid+path will still exist as parent of the new child, in proc.Parent
if proc != nil && proc.Path != "" && item.Proc.Path != proc.Path {
log.Debug("[event inCache, replacement] new: %d, %s -> inCache: %d -> %s", proc.ID, proc.Path, item.Proc.ID, item.Proc.Path)
//e.UpdateItem(proc)
needsUpdate = true
if proc != nil && (proc.ID == cachedProc.ID && proc.Path != cachedProc.Path) {
return true
}

return
}
sumsCount := cachedProc.ChecksumsCount()

// IsInStoreByPID checks if a pid exists in cache.
func (e *EventsStore) IsInStoreByPID(key int) (item *ExecEventItem, found bool) {
e.mu.RLock()
item, found = e.eventByPID[key]
e.mu.RUnlock()
return
if proc != nil && sumsCount > 0 && cachedProc.IsAlive() {
return false
}

if cachedProc != nil && sumsCount == 0 {
return true
}

if proc != nil && len(proc.Tree) == 0 {
return true
}
if cachedProc != nil && len(cachedProc.Tree) == 0 {
return true
}

return false
}

// IsInStoreByPath checks if a process exists in cache by path.
func (e *EventsStore) IsInStoreByPath(path string) (item *ExecEventItem, found bool) {
if path == "" || path == KernelConnection {
// IsInStore checks if a PID is in the store.
// If the PID is in cache, we may need to update it if the PID
// is reusing the PID of the parent.
func (e *EventsStore) IsInStore(key int, proc *Process) (item ExecEventItem, needsUpdate, found bool) {

item, found = e.IsInStoreByPID(key)
if !found {
return
}
e.mu.RLock()
item, found = e.eventByPath[path]
e.mu.RUnlock()
if found {
log.Debug("[cache] event found by path: %s", path)
if found && e.needsUpdate(&item.Proc, proc) {
needsUpdate = true
return
}

log.Debug("[cache] Event found by PID: %d, %s", key, item.Proc.Path)

return
}

// Delete an item from cache
func (e *EventsStore) Delete(key int) {
// IsInStoreByPID checks if a pid exists in cache.
func (e *EventsStore) IsInStoreByPID(key int) (item ExecEventItem, found bool) {
e.mu.Lock()
delete(e.eventByPID, key)
e.mu.Unlock()
defer e.mu.Unlock()
item, found = e.eventByPID[key]

if !found {
return
}

item.LastSeen = time.Now().UnixNano()

return
}

// Len returns the number of items in cache.
Expand All @@ -165,90 +239,48 @@ func (e *EventsStore) Len() int {
return len(e.eventByPID)
}

// DeleteOldItems deletes items that have exceeded the TTL
func (e *EventsStore) DeleteOldItems() {
// Delete schedules an item to be deleted from cache.
func (e *EventsStore) Delete(key int) {
e.mu.Lock()
defer e.mu.Unlock()

log.Debug("[cache] deleting old events, total byPID: %d, byPath: %d", len(e.eventByPID), len(e.eventByPath))
for k, item := range e.eventByPID {
if item.Proc.IsAlive() == false {
log.Debug("[cache] deleting old PID: %d -> %s", k, item.Proc.Path)
delete(e.eventByPID, k)
}
ev, found := e.eventByPID[key]
if !found {
return
}
for path, item := range e.eventByPath {
if item.Proc.IsAlive() == false {
log.Debug("[cache] deleting old path: %d -> %s", item.Proc.ID, item.Proc.Path)
delete(e.eventByPath, path)
}
if !ev.Proc.IsAlive() {
delete(e.eventByPID, key)
}
}

// -------------------------------------------------------------------------
// TODO: Move to its own package.
// A hashing service than runs in background, and accepts paths to hash
// and returns the hashes for different algorithms (configurables)

// ComputeChecksums decides if we need to compute the checksum of a process or not.
// We don't recalculate hashes during the life of the process.
func (e *EventsStore) ComputeChecksums(proc *Process) {
if !e.checksumsEnabled {
return
}
log.Debug("[cache] reuseChecksums %d, %s", proc.ID, proc.Path)

// XXX: why double check if the PID is in cache?
// reuseChecksums is called from Add(), and before calling Add() we check if
// the PID is in cache.
// The problem is that we don't intercept some events (fork, clone*, dup*),
// and because of this sometimes we don't receive the event of the parent.
item, _, found := e.IsInStore(proc.ID, proc)
if !found {
log.Debug("cache.reuseChecksums() %d not inCache, %s", proc.ID, proc.Path)

// if parent path and current path are equal, and the parent is alive, see if we have the hash of the parent path
if !proc.IsChild() {
proc.ComputeChecksums(e.checksums)
log.Debug("[cache] reuseChecksums() pid not in cache, not child of parent: %d, %s - %d - %v", proc.ID, proc.Path, proc.Starttime, proc.Checksums)
return
}

// parent path is nil or paths differ or parent is not alive
// compute new checksums
log.Debug("[cache] reuseChecksums() proc is child, proc: %d, %d, %s parent: %d, %d, %s", proc.Starttime, proc.ID, proc.Path, proc.Parent.Starttime, proc.Parent.ID, proc.Parent.Path)
pit, found := e.IsInStoreByPath(proc.Parent.Path)
if !found {
//log.Info("cache.reuseChecksums() cache.add() pid not found byPath: %d, %s, parent: %d, %s", proc.ID, proc.Path, proc.Parent.ID, proc.Parent.Path)
proc.ComputeChecksums(e.checksums)
return
}
// DeleteOldItems deletes items that have exited and exceeded the TTL.
// Keeping them in cache for a short period of time sometimes helps to
// link some connections to processes.
// Alived processes are not deleted.
func (e *EventsStore) DeleteOldItems() {
e.mu.Lock()
defer e.mu.Unlock()

// if the parent path is in cache reuse the checksums
log.Debug("[cache] reuseChecksums() inCache, found by parent path: %d:%s, parent alive: %v, %d:%s", pit.Proc.ID, pit.Proc.Path, proc.Parent.IsAlive(), proc.Parent.ID, proc.Parent.Path)
if len(pit.Proc.Checksums) == 0 {
proc.ComputeChecksums(e.checksums)
return
log.Debug("[cache] deleting old events, total byPID: %d", len(e.eventByPID))
for k, item := range e.eventByPID {
if !item.isValid() && !item.Proc.IsAlive() {
delete(e.eventByPID, k)
}
log.Debug("[cache] reuseCheckums() reusing checksums: %v", pit.Proc.Checksums)
proc.Checksums = pit.Proc.Checksums
return
}
}

// pid found in cache
// we should check other parameters to see if the pid is really the same process
// proc/<pid>/maps
item.Proc.RLock()
checksumsNum := len(item.Proc.Checksums)
item.Proc.RUnlock()
if checksumsNum > 0 && (item.Proc.IsAlive() && item.Proc.Path == proc.Path) {
log.Debug("[cache] reuseChecksums() cached PID alive, already hashed: %v, %s new: %s", item.Proc.Checksums, item.Proc.Path, proc.Path)
proc.Checksums = item.Proc.Checksums
return
}
log.Debug("[cache] reuseChecksums() PID found inCache, computing hashes: %s new: %s - hashes: |%v<>%v|", item.Proc.Path, proc.Path, item.Proc.Checksums, proc.Checksums)
// ComputeChecksums obtains the checksums of the process
func (e *EventsStore) ComputeChecksums(proc *Process) bool {
e.mu.RLock()
defer e.mu.RUnlock()

if !e.checksumsEnabled || proc != nil && proc.IsAlive() && proc.ChecksumsCount() > 0 {
log.Debug("[cache] ComputeChecksums, already hashed: %s -> %v", proc.Path, proc.Checksums)
return false
}
proc.ComputeChecksums(e.checksums)

return true
}

// AddChecksumHash adds a new hash algorithm to compute checksums
Expand Down Expand Up @@ -279,12 +311,12 @@ func (e *EventsStore) SetComputeChecksums(compute bool) {
if !compute {
for _, item := range e.eventByPID {
// XXX: reset saved checksums? or keep them in cache?
item.Proc.Checksums = make(map[string]string)
item.Proc.ResetChecksums()
}
return
}
for _, item := range e.eventByPID {
if len(item.Proc.Checksums) == 0 {
if item.Proc.ChecksumsCount() == 0 {
item.Proc.ComputeChecksums(e.checksums)
}
}
Expand Down
Loading

0 comments on commit 431e2d3

Please sign in to comment.