Skip to content

Conversation

@notque
Copy link
Contributor

@notque notque commented Nov 19, 2025

an attempt to add disk buffering of events to make the rabbitmq connection non-blocking with disk backed storage to hold if it is down. there was a single downtime this year where the network connection from scaleout had an issue, and a service was down due to it.

we cannot lose events, so we currently just stop the service. but we also have availability requirements.

thus the idea is we add pvcs to services, and hold events there with somewhat largely configured settings to withstand any connectivity issues. i do have the defaults set somewhat small at the moment.

you may dislike this, and have a significantly better plan. i'd love to hear it. i just gave it a shot.

@notque notque self-assigned this Nov 19, 2025
@notque notque marked this pull request as draft November 19, 2025 04:55
@notque notque force-pushed the audit_backing_store branch from d1887ee to 6e065d1 Compare November 19, 2025 05:37
@notque notque requested a review from Copilot November 19, 2025 05:39
Copilot finished reviewing on behalf of notque November 19, 2025 05:42
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds disk-based buffering for audit events to prevent service disruption during RabbitMQ outages. The implementation introduces a BackingStore interface with a file-based implementation that persists events to disk when RabbitMQ is unavailable, then drains them when connectivity is restored.

Key Changes:

  • New BackingStore interface and FileBackingStore implementation for disk-based event persistence
  • Integration of backing store into auditTrail.Commit() with flow control and periodic draining
  • Configuration support via AuditorOpts with environment variable overrides for backing store path and size limits

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 13 comments.

Show a summary per file
File Description
audittools/backing_store.go Implements file-based backing store with JSONL format, file rotation, size limits, and Prometheus metrics
audittools/backing_store_test.go Tests for backing store basic operations, file rotation, permissions, and size limits
audittools/trail.go Integrates backing store with event processing pipeline, adds flow control and periodic drain mechanism
audittools/auditor.go Adds backing store configuration options and environment variable support
audittools/README.md Documents backing store feature, configuration, behavior, deployment options, and compliance requirements

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@notque notque force-pushed the audit_backing_store branch from 6e065d1 to 8d68fb5 Compare November 19, 2025 06:44
Copy link
Contributor

@majewsky majewsky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dealing with filesystem volumes is going to be a major complication for all of my services, where I do not have volumes at all. I have already considered writing audit events into the DB until they can be submitted, so if we introduce a caching capability at this level, I would like for it to be able to support a DB as backing store, too.

I can see that you have type BackingStore as an interface, so presumably we can provide an SQL-based implementation instead. We don't need to have this implementation as part of this PR, in order to keep it at a manageable size. But what this PR should do is build the public interface in such a way that it allows specifying backing stores other than FileBackingStore, potentially with different types of configuration parameters.

Since we need to support passing configuration via env variables, I suggest something similar to how we pass configuration for Keppel drivers. This is a full example, and this is how we parse these, but basically we could have something like this:

export ${PREFIX}_BACKING_STORE='{"type":"fs","params":{"path":"/var/cache/audit","max_total_size":1073741824}}'

The difference between opts.BackingStoreFactories vs. opts.BackingStore is similar to opts.EnvPrefix vs. opts.ConnectionURL: One allows using the default logic of collecting everything from env vars, one allows the application precise control over where to collect config from.

To allow both the application as well as this library to provide BackingStore implementations, I suggest modeling AuditorOpts like this:

type AuditorOpts struct {
  // Optional. If given, this BackingStore instance will be used directly.
  // If EnvPrefix is given, this will be initialized by reading a JSON payload in the form `{"type":"<type>","params":{...}}`
  // from the environment variable "${PREFIX}_BACKING_STORE".
  BackingStore BackingStore

  // Optional. If given, and the environment contains JSON configuration as described above,
  // a BackingStore constructor will be selected from this set based on the configured type.
  BackingStoreFactories map[string]BackingStoreFactory
}

type BackingStoreFactory func(params json.RawMessage, opts AuditorOpts) (BackingStore, error)

func NewFileBackingStore(params json.RawMessage, opts AuditorOpts) (BackingStore, error) {
  var bsOpts struct {
    Directory string `json:"path"`
    MaxFileSize int64 `json:"max_file_size"`
    MaxTotalSize int64 `json:"max_total_size"`
  }
  err := json.Unmarshal([]byte(params), &bsOpts)
  if err != nil {
    return nil, fmt.Errorf("while unmarshaling params for FileBackingStore: %w", err)
  }
  registry := opts.Registry
  //... continue with existing implementation...
}

Then this could be used as:

auditor := must.Return(audittools.NewAuditor(ctx, audittools.AuditorOpts{
  EnvPrefix: "LIMES_AUDIT_RABBITMQ",
  BackingStoreFactories: map[string]audittools.BackingStoreFactory{
    "fs": audittools.NewFileBackingStore,
    "db": func(params json.RawMessage, opts audittools.AuditorOpts) {
      return newDBBackingStore(dbConnection, params, opts.Registry)
    },
  },
})

What do you think?

drainTicker := time.NewTicker(1 * time.Minute)
defer drainTicker.Stop()

var pendingEvents []cadf.Event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not read this in detail, but my immediate thought is that pendingEvents is sort-of redundant with the backing store. Both are used as a cache for events that are currently undeliverable. Would it make sense to subsume pendingEvents into the new mechanism, i.e. removing it here and instead adding an InMemoryBackingStore, that acts as a fallback if no other BackingStore is defined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, you are correct. I've made adjustments to correctly handle this.

@notque
Copy link
Contributor Author

notque commented Nov 21, 2025

What do you think?

You bring up very reasonable points, and ones I thought you'd bring up. I'm not particularly happy with adding volumes to every service, and agree that supporting additional options like a database is reasonable.

I think you understand the situation, I don't particularly think this is a great achievement, but it's a requirement I cannot avoid, and thus I am trying to bring about a solution that involves as little pain as possible.

I did accidently submit this as as pr ready to review, and then set it to work in progress. Apologies you're receiving notifications from it. I am not at a stage yet where it is ready to review. I will end up pushing up a lot of changes again tonight, and then also trying to incorporate your suggestions with the state I have it in currently.

I do very much appreciate your stating clearly what you want here, as I'm happy to do whatever I can to not make this terrible for you.

@notque notque force-pushed the audit_backing_store branch from 8d68fb5 to bdd7e07 Compare November 21, 2025 04:02
@github-actions
Copy link

Merging this branch will increase overall coverage

Impacted Packages Coverage Δ 🤖
github.com/sapcc/go-bits/audittools 41.81% (+41.81%) 🌟

Coverage by file

Changed files (no unit tests)

Changed File Coverage Δ Total Covered Missed 🤖
github.com/sapcc/go-bits/audittools/auditor.go 49.00% (+49.00%) 1500 (+435) 735 (+735) 765 (-300) 🌟
github.com/sapcc/go-bits/audittools/backing_store.go 69.23% (+69.23%) 195 (+195) 135 (+135) 60 (+60) 🌟
github.com/sapcc/go-bits/audittools/backing_store_file.go 60.45% (+60.45%) 2655 (+2655) 1605 (+1605) 1050 (+1050) 🌟
github.com/sapcc/go-bits/audittools/backing_store_memory.go 100.00% (+100.00%) 555 (+555) 555 (+555) 0 🌟
github.com/sapcc/go-bits/audittools/backing_store_sql.go 2.73% (+2.73%) 1650 (+1650) 45 (+45) 1605 (+1605) 👍
github.com/sapcc/go-bits/audittools/trail.go 20.90% (+20.90%) 1005 (+435) 210 (+210) 795 (+225) 🌟

Please note that the "Total", "Covered", and "Missed" counts above refer to code statements instead of lines of code. The value in brackets refers to the test coverage of that file in the old version of the code.

Changed unit test files

  • github.com/sapcc/go-bits/audittools/auditor_test.go
  • github.com/sapcc/go-bits/audittools/backing_store_sql_test.go
  • github.com/sapcc/go-bits/audittools/backing_store_test.go

@notque notque requested a review from Copilot November 21, 2025 04:09
Copilot finished reviewing on behalf of notque November 21, 2025 04:13
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 11 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Help: "Total number of audit events read from the backing store.",
})
s.sizeGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "audittools_backing_store_size_bytes",
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The audittools_backing_store_size_bytes metric has inconsistent semantics across different backing store implementations:

  • FileBackingStore (line 100): "Current total size of the backing store in bytes" - reports actual byte size
  • InMemoryBackingStore (line 71): "Current number of events in the in-memory backing store" - reports event count
  • SQLBackingStore (line 159): "Current number of events in the SQL backing store" - reports event count

The metric name includes "_bytes" which suggests it should represent size in bytes, but two implementations use it for event count. This makes the metric unusable for unified monitoring across different backing store types.

Consider either:

  1. Renaming to audittools_backing_store_size and documenting that units vary by implementation
  2. Creating separate metrics: audittools_backing_store_size_bytes and audittools_backing_store_events_count
  3. Making all implementations report bytes (though this may be less useful for in-memory/SQL stores)
Suggested change
Name: "audittools_backing_store_size_bytes",
Name: "audittools_backing_store_events_count",

Copilot uses AI. Check for mistakes.
* `audittools_backing_store_reads_total`: Total number of audit events read from the backing store.
* `audittools_backing_store_size_bytes`: Current size of the backing store.
* File-based: Total size in bytes
* In-memory: Number of events
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metrics documentation (lines 231-233) doesn't mention the SQL backing store's metric semantics. The SQL implementation also uses audittools_backing_store_size_bytes to report event count (not bytes), same as the in-memory store.

Update the documentation to include:

*   `audittools_backing_store_size_bytes`: Current size of the backing store.
    *   File-based: Total size in bytes
    *   In-memory: Number of events
    *   SQL: Number of events
Suggested change
* In-memory: Number of events
* In-memory: Number of events
* SQL: Number of events

Copilot uses AI. Check for mistakes.
Comment on lines +4 to +18
-- SQL Backing Store Migration
--
-- This migration creates the required table for the SQL backing store.
-- The table will be created automatically by the backing store if skip_migration is false,
-- but you can also run this migration manually if you prefer to manage schema separately.

CREATE TABLE IF NOT EXISTS audit_events (
id BIGSERIAL PRIMARY KEY,
event_data JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Index for efficient FIFO reads (oldest events first)
CREATE INDEX IF NOT EXISTS audit_events_created_at_id_idx
ON audit_events (created_at, id);
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The migration SQL file uses a hardcoded table name audit_events, but the SQLBackingStore supports configurable table names via the table_name parameter. This creates a mismatch for users who want to use custom table names.

Consider either:

  1. Adding a note that this migration is only for the default table name and users with custom names should adapt it
  2. Providing a template with placeholders like ${TABLE_NAME} and instructions on how to substitute them
  3. Recommending users rely on automatic migration for custom table names

Copilot uses AI. Check for mistakes.
Comment on lines +88 to +104
if !sendEvent(&e) {
if err := t.BackingStore.Write(e); err != nil {
logg.Error("audittools: failed to write to backing store: %s", err.Error())
// Backing store is likely full. Apply backpressure to prevent data loss.
backingStoreFull = true
}
}
case <-metricsTicker.C:
if err := t.BackingStore.UpdateMetrics(); err != nil {
logg.Error("audittools: failed to update backing store metrics: %s", err.Error())
}
case <-drainTicker.C:
// Drain backing store and resume reading from EventSink if successful
drained := t.drainBackingStore(sendEvent)
if drained && backingStoreFull {
backingStoreFull = false
}
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The backpressure mechanism at lines 89-93 sets backingStoreFull = true for ANY write error, not just ErrBackingStoreFull. This means transient errors (I/O errors, permissions issues, etc.) will trigger backpressure and stop processing new events.

The flag is only cleared at lines 102-103 if at least one batch was successfully drained. However, if the write error was transient and the backing store is empty (or becomes empty before the next drain cycle), the flag will never be cleared, causing permanent backpressure.

Consider:

  1. Only setting the flag for ErrBackingStoreFull specifically: if errors.Is(err, ErrBackingStoreFull)
  2. Adding a retry mechanism or periodic flag reset after successful operations
  3. Attempting a test write during drain to verify the store is still full before keeping the flag set

Copilot uses AI. Check for mistakes.
var event cadf.Event
if err := json.Unmarshal(eventData, &event); err != nil {
s.errorCounter.WithLabelValues("read_unmarshal").Inc()
// Skip corrupted events and continue
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrupted events that fail to unmarshal are skipped but never deleted from the database. This means they will be re-read in every subsequent ReadBatch() call, causing an infinite loop of failed unmarshaling attempts and preventing the system from draining the backing store.

Unlike the file-based backing store which has dead-letter file handling (lines 303-304 in backing_store_file.go), the SQL backing store has no mechanism to handle or remove corrupted events.

Consider adding similar dead-letter handling for SQL, such as:

  1. Moving corrupted events to a separate dead-letter table
  2. Deleting corrupted events after logging them
  3. Updating a corrupted flag in the existing table to skip them in future reads
Suggested change
// Skip corrupted events and continue
// Log and delete corrupted events to avoid infinite re-reading
_ = s.deleteEventByID(id)

Copilot uses AI. Check for mistakes.
Comment on lines +101 to +123
**SQL/Database Backing Store**:
```go
auditor, err := audittools.NewAuditor(audittools.AuditorOpts{
EnvPrefix: "MYSERVICE_AUDIT",
BackingStoreConfig: `{"type":"sql","params":{"dsn":"postgres://user:pass@localhost/mydb?sslmode=require","max_events":10000}}`,
})
```

**File-Based Backing Store**:
```go
auditor, err := audittools.NewAuditor(audittools.AuditorOpts{
EnvPrefix: "MYSERVICE_AUDIT",
BackingStoreConfig: `{"type":"file","params":{"directory":"/var/lib/myservice/audit-buffer","max_total_size":1073741824}}`,
})
```

**In-Memory Backing Store**:
```go
auditor, err := audittools.NewAuditor(audittools.AuditorOpts{
EnvPrefix: "MYSERVICE_AUDIT",
BackingStoreConfig: `{"type":"memory","params":{"max_events":1000}}`,
})
```
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code examples are missing the required context.Context parameter for NewAuditor. The function signature requires a context as the first parameter.

All three examples should include the context parameter:

auditor, err := audittools.NewAuditor(context.Background(), audittools.AuditorOpts{
    EnvPrefix: "MYSERVICE_AUDIT",
    BackingStoreConfig: `{"type":"sql","params":{"dsn":"...","max_events":10000}}`,
})

Copilot uses AI. Check for mistakes.

b, err := json.Marshal(event)
if err != nil {
f.Close()
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File handle may be writable as a result of data flow from a call to OpenFile and closing it may result in data loss upon failure, which is not handled explicitly.

Copilot uses AI. Check for mistakes.

_, err = f.Write(append(b, '\n'))
if err != nil {
f.Close()
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File handle may be writable as a result of data flow from a call to OpenFile and closing it may result in data loss upon failure, which is not handled explicitly.

Copilot uses AI. Check for mistakes.
Comment on lines +360 to +372
f.Close()
return fmt.Errorf("audittools: failed to marshal dead-letter entry: %w", err)
}

_, err = f.Write(append(b, '\n'))
if err != nil {
f.Close()
return fmt.Errorf("audittools: failed to write to dead-letter file: %w", err)
}

// Force flush to disk for dead-letter files (critical audit data)
if err := f.Sync(); err != nil {
f.Close()
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File handle may be writable as a result of data flow from a call to OpenFile and closing it may result in data loss upon failure, which is not handled explicitly.

Suggested change
f.Close()
return fmt.Errorf("audittools: failed to marshal dead-letter entry: %w", err)
}
_, err = f.Write(append(b, '\n'))
if err != nil {
f.Close()
return fmt.Errorf("audittools: failed to write to dead-letter file: %w", err)
}
// Force flush to disk for dead-letter files (critical audit data)
if err := f.Sync(); err != nil {
f.Close()
closeErr := f.Close()
if closeErr != nil {
return fmt.Errorf("audittools: failed to marshal dead-letter entry: %v; additionally, failed to close file: %w", err, closeErr)
}
return fmt.Errorf("audittools: failed to marshal dead-letter entry: %w", err)
}
_, err = f.Write(append(b, '\n'))
if err != nil {
closeErr := f.Close()
if closeErr != nil {
return fmt.Errorf("audittools: failed to write to dead-letter file: %v; additionally, failed to close file: %w", err, closeErr)
}
return fmt.Errorf("audittools: failed to write to dead-letter file: %w", err)
}
// Force flush to disk for dead-letter files (critical audit data)
if err := f.Sync(); err != nil {
closeErr := f.Close()
if closeErr != nil {
return fmt.Errorf("audittools: failed to sync dead-letter file: %v; additionally, failed to close file: %w", err, closeErr)
}

Copilot uses AI. Check for mistakes.
Comment on lines +360 to +366
f.Close()
return fmt.Errorf("audittools: failed to marshal dead-letter entry: %w", err)
}

_, err = f.Write(append(b, '\n'))
if err != nil {
f.Close()
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File handle may be writable as a result of data flow from a call to OpenFile and closing it may result in data loss upon failure, which is not handled explicitly.

Suggested change
f.Close()
return fmt.Errorf("audittools: failed to marshal dead-letter entry: %w", err)
}
_, err = f.Write(append(b, '\n'))
if err != nil {
f.Close()
closeErr := f.Close()
if closeErr != nil {
return errors.Join(fmt.Errorf("audittools: failed to marshal dead-letter entry: %w", err), fmt.Errorf("audittools: failed to close dead-letter file: %w", closeErr))
}
return fmt.Errorf("audittools: failed to marshal dead-letter entry: %w", err)
}
_, err = f.Write(append(b, '\n'))
if err != nil {
closeErr := f.Close()
if closeErr != nil {
return errors.Join(fmt.Errorf("audittools: failed to write to dead-letter file: %w", err), fmt.Errorf("audittools: failed to close dead-letter file: %w", closeErr))
}

Copilot uses AI. Check for mistakes.
@notque
Copy link
Contributor Author

notque commented Nov 21, 2025

@majewsky i've made adjustments based on your comments. Is this remotely reasonable for you. I clearly have more work to do going through feedback from copilot, but in broad strokes is this okay?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants