Skip to content

Commit 8d68fb5

Browse files
committed
feat: add backing store for disk buffering of events
1 parent 40e7a0c commit 8d68fb5

File tree

6 files changed

+1051
-4
lines changed

6 files changed

+1051
-4
lines changed

audittools/README.md

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
<!-- SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company
2+
SPDX-License-Identifier: Apache-2.0
3+
-->
4+
5+
# audittools
6+
7+
`audittools` provides a standard interface for generating and sending CADF (Cloud Auditing Data Federation) audit events to a RabbitMQ message broker.
8+
9+
## Certification Requirements (PCI DSS, SOC 2, and more)
10+
11+
As a cloud provider subject to strict audits (including PCI DSS and more), we must ensure the **completeness** and **integrity** of audit logs while maintaining service **availability**.
12+
13+
### Standard Production Configuration
14+
15+
**You MUST configure a Backing Store with Persistent Storage (PVC).**
16+
17+
* **Configuration**: Set `BackingStorePath` to a mount point backed by a PVC.
18+
* **Requirement**: This ensures that audit events are preserved even in double-failure scenarios (RabbitMQ outage + Pod crash/reschedule).
19+
* **Compliance**: Satisfies requirements for guaranteed event delivery and audit trail completeness.
20+
21+
### Non-Compliant Configurations
22+
23+
The following configurations are available for development or specific edge cases but are **NOT** recommended for production services subject to audit:
24+
25+
1. **Ephemeral Storage (emptyDir)**:
26+
* *Risk*: Data loss if the Pod is rescheduled during a RabbitMQ outage.
27+
* *Status*: **Development / Testing Only**.
28+
29+
2. **No Backing Store**:
30+
* *Behavior*: The service will **block** (stop processing requests) if the RabbitMQ broker is down and the memory buffer fills up.
31+
* *Risk*: Service downtime (Violation of Availability targets).
32+
* *Status*: **Not Recommended**. Only acceptable if service downtime is preferred over *any* storage complexity.
33+
34+
## Usage
35+
36+
### Basic Setup
37+
38+
To use `audittools`, you typically initialize an `Auditor` with your RabbitMQ connection details.
39+
40+
```go
41+
import "github.com/sapcc/go-bits/audittools"
42+
43+
func main() {
44+
// ...
45+
auditor, err := audittools.NewAuditor(audittools.AuditorOpts{
46+
EnvPrefix: "MYSERVICE_AUDIT", // Configures env vars like MYSERVICE_AUDIT_RABBITMQ_URL
47+
})
48+
if err != nil {
49+
log.Fatal(err)
50+
}
51+
// ...
52+
}
53+
```
54+
55+
### Sending Events
56+
57+
```go
58+
event := cadf.Event{
59+
// ... fill in event details ...
60+
}
61+
auditor.Record(event)
62+
```
63+
64+
## Disk-Based Buffering
65+
66+
`audittools` includes buffering to ensure audit events are not lost if the RabbitMQ broker becomes unavailable. Events are temporarily written to disk and replayed once the connection is restored.
67+
68+
### Configuration
69+
70+
Buffering is enabled by providing a `BackingStorePath`.
71+
72+
```go
73+
auditor, err := audittools.NewAuditor(audittools.AuditorOpts{
74+
EnvPrefix: "MYSERVICE_AUDIT",
75+
BackingStorePath: "/var/lib/myservice/audit-buffer",
76+
})
77+
```
78+
79+
Or via environment variables:
80+
81+
* `MYSERVICE_AUDIT_BACKING_STORE_PATH`: Directory to store buffered events.
82+
* `MYSERVICE_AUDIT_BACKING_STORE_MAX_TOTAL_SIZE`: (Optional) Max total size of the buffer in bytes.
83+
84+
### Kubernetes Deployment
85+
86+
If running in Kubernetes, you have two main options for the backing store:
87+
88+
1. **Persistent Storage (PVC) - Recommended for Audit Compliance**:
89+
* Mount a Persistent Volume Claim (PVC) at the `BackingStorePath`.
90+
* **Pros**: Data survives Pod deletion, rescheduling, and rolling updates.
91+
* **Cons**: Adds complexity (volume management, access modes).
92+
* **Use Case**: **Required** for strict audit compliance to ensure no data is lost even if the service instance fails during a broker outage.
93+
94+
2. **Ephemeral Storage (emptyDir)**:
95+
* Mount an `emptyDir` volume at the `BackingStorePath`.
96+
* **Pros**: Simple, fast, no persistent volume management.
97+
* **Cons**: Data is lost if the *Pod* is deleted or rescheduled. However, it survives container restarts within the same Pod.
98+
* **Use Case**: Suitable for non-critical environments or where occasional data loss during complex failure scenarios (simultaneous broker outage + pod rescheduling) is acceptable.
99+
100+
### Behavior
101+
102+
The system transitions through the following states to ensure zero data loss:
103+
104+
1. **Normal Operation**: Events are sent directly to RabbitMQ.
105+
2. **RabbitMQ Outage**: Events are written to the disk backing store. The application continues without blocking.
106+
3. **Disk Full**: If the backing store reaches `BackingStoreMaxTotalSize`, writes fail. Events are buffered in memory (up to 20).
107+
4. **Fail-Closed**: If the memory buffer fills up, `auditor.Record()` **blocks**. This pauses the application to prevent data loss.
108+
5. **Recovery**: A background routine continuously drains the backing store to RabbitMQ once it becomes available. New events are persisted to disk during draining to prevent blocking.
109+
110+
**Additional Details**:
111+
112+
* **Security**: The directory is created with `0700` permissions, and files with `0600`, ensuring only the service user can access the sensitive audit data.
113+
* **Capacity**: If `BackingStoreMaxTotalSize` is configured, the store will reject new writes when full. The limit is approximate and may be exceeded by up to one event's size (typically a few KB) due to the check-then-write sequence. Set the limit with appropriate headroom for your filesystem.
114+
* **Corrupted Event Handling**:
115+
* Corrupted events encountered during reads are written to dead-letter files (`audit-events-deadletter-*.jsonl`)
116+
* Dead-letter files contain metadata (timestamp, source file) and the raw corrupted data for investigation
117+
* The `corrupted_event` metric is incremented for monitoring
118+
* Source files are deleted after processing, even if all events were corrupted (after moving to dead-letter)
119+
* Dead-letter files should be monitored and investigated to identify data corruption issues
120+
121+
### Metrics
122+
123+
The backing store exports the following Prometheus metrics:
124+
125+
* `audittools_backing_store_writes_total`: Total number of audit events written to disk.
126+
* `audittools_backing_store_reads_total`: Total number of audit events read from disk.
127+
* `audittools_backing_store_errors_total`: Total number of errors, labeled by operation:
128+
* `write_stat`: Failed to stat file during rotation check
129+
* `write_full`: Backing store is full (exceeds `MaxTotalSize`)
130+
* `write_open`: Failed to open backing store file for writing
131+
* `write_marshal`: Failed to marshal event to JSON
132+
* `write_io`: Failed to write event to disk
133+
* `write_sync`: Failed to sync (flush) event to disk
134+
* `write_close`: Failed to close backing store file
135+
* `read_open`: Failed to open backing store file for reading
136+
* `read_scan`: Failed to scan backing store file
137+
* `corrupted_event`: Encountered corrupted event during read (written to dead-letter)
138+
* `deadletter_write`: Successfully wrote corrupted event to dead-letter file
139+
* `deadletter_write_failed`: Failed to write corrupted event to dead-letter file
140+
* `commit_remove`: Failed to remove file after successful processing
141+
* `audittools_backing_store_size_bytes`: Current total size of the backing store in bytes.
142+
* `audittools_backing_store_files_count`: Current number of files in the backing store.

audittools/auditor.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ import (
1717
"fmt"
1818
"net"
1919
"net/url"
20+
"os"
2021
"strconv"
2122
"testing"
2223

2324
"github.com/prometheus/client_golang/prometheus"
25+
2426
"github.com/sapcc/go-api-declarations/cadf"
2527

2628
"github.com/sapcc/go-bits/assert"
@@ -29,6 +31,13 @@ import (
2931
"github.com/sapcc/go-bits/osext"
3032
)
3133

34+
const (
35+
// eventBufferSize is the maximum number of events that can be buffered in memory
36+
// when the RabbitMQ connection is unavailable and the backing store is full or unavailable.
37+
// When this limit is reached, Record() will block to prevent data loss.
38+
eventBufferSize = 20
39+
)
40+
3241
// Auditor is a high-level interface for audit event acceptors.
3342
// In a real process, use NewAuditor() or NewNullAuditor() depending on whether you have RabbitMQ client credentials.
3443
// In a test scenario, use NewMockAuditor() to get an assertable mock implementation.
@@ -63,6 +72,14 @@ type AuditorOpts struct {
6372
// - "audittools_successful_submissions" (counter, no labels)
6473
// - "audittools_failed_submissions" (counter, no labels)
6574
Registry prometheus.Registerer
75+
76+
// Optional. If given, the Auditor will buffer events in this directory when the RabbitMQ server is unreachable.
77+
// If EnvPrefix is given, this can also be set via the environment variable "${PREFIX}_BACKING_STORE_PATH".
78+
BackingStorePath string
79+
80+
// Optional. If given, limits the total size of the backing store in bytes.
81+
// If EnvPrefix is given, this can also be set via the environment variable "${PREFIX}_BACKING_STORE_MAX_TOTAL_SIZE".
82+
BackingStoreMaxTotalSize int64
6683
}
6784

6885
func (opts AuditorOpts) getConnectionOptions() (rabbitURL url.URL, queueName string, err error) {
@@ -99,6 +116,7 @@ func (opts AuditorOpts) getConnectionOptions() (rabbitURL url.URL, queueName str
99116
User: url.UserPassword(username, pass),
100117
Path: "/",
101118
}
119+
102120
return rabbitURL, queueName, nil
103121
}
104122

@@ -139,16 +157,50 @@ func NewAuditor(ctx context.Context, opts AuditorOpts) (Auditor, error) {
139157
opts.Registry.MustRegister(failureCounter)
140158
}
141159

160+
// read backing store options from environment if EnvPrefix is set
161+
if opts.EnvPrefix != "" {
162+
if opts.BackingStorePath == "" {
163+
opts.BackingStorePath = os.Getenv(opts.EnvPrefix + "_BACKING_STORE_PATH")
164+
}
165+
if opts.BackingStoreMaxTotalSize == 0 {
166+
if val := os.Getenv(opts.EnvPrefix + "_BACKING_STORE_MAX_TOTAL_SIZE"); val != "" {
167+
size, err := strconv.ParseInt(val, 10, 64)
168+
if err != nil {
169+
return nil, fmt.Errorf("audittools: invalid value for %s_BACKING_STORE_MAX_TOTAL_SIZE: %w", opts.EnvPrefix, err)
170+
}
171+
opts.BackingStoreMaxTotalSize = size
172+
}
173+
}
174+
}
175+
142176
// spawn event delivery goroutine
143177
rabbitURL, queueName, err := opts.getConnectionOptions()
144178
if err != nil {
145179
return nil, err
146180
}
147-
eventChan := make(chan cadf.Event, 20)
181+
182+
var backingStore BackingStore
183+
if opts.BackingStorePath != "" {
184+
bsRegistry := opts.Registry
185+
if bsRegistry == nil {
186+
bsRegistry = prometheus.DefaultRegisterer
187+
}
188+
backingStore, err = NewFileBackingStore(FileBackingStoreOpts{
189+
Directory: opts.BackingStorePath,
190+
MaxTotalSize: opts.BackingStoreMaxTotalSize,
191+
Registry: bsRegistry,
192+
})
193+
if err != nil {
194+
return nil, fmt.Errorf("failed to initialize backing store: %w", err)
195+
}
196+
}
197+
198+
eventChan := make(chan cadf.Event, eventBufferSize)
148199
go auditTrail{
149200
EventSink: eventChan,
150201
OnSuccessfulPublish: func() { successCounter.Inc() },
151202
OnFailedPublish: func() { failureCounter.Inc() },
203+
BackingStore: backingStore,
152204
}.Commit(ctx, rabbitURL, queueName)
153205

154206
return &standardAuditor{

audittools/auditor_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package audittools
5+
6+
import (
7+
"context"
8+
"strings"
9+
"testing"
10+
11+
"github.com/prometheus/client_golang/prometheus"
12+
)
13+
14+
func TestNewAuditorInvalidMaxTotalSize(t *testing.T) {
15+
// Test that invalid BACKING_STORE_MAX_TOTAL_SIZE value returns an error
16+
t.Setenv("TEST_AUDIT_BACKING_STORE_MAX_TOTAL_SIZE", "10GB")
17+
18+
_, err := NewAuditor(context.Background(), AuditorOpts{
19+
EnvPrefix: "TEST_AUDIT",
20+
Observer: Observer{
21+
TypeURI: "service/test",
22+
Name: "test-service",
23+
ID: "test-id",
24+
},
25+
Registry: prometheus.NewRegistry(),
26+
})
27+
28+
if err == nil {
29+
t.Fatal("expected error for invalid BACKING_STORE_MAX_TOTAL_SIZE, got nil")
30+
}
31+
if !strings.Contains(err.Error(), "invalid value for TEST_AUDIT_BACKING_STORE_MAX_TOTAL_SIZE") {
32+
t.Fatalf("expected error message to contain 'invalid value for TEST_AUDIT_BACKING_STORE_MAX_TOTAL_SIZE', got: %v", err)
33+
}
34+
}
35+
36+
func TestNewAuditorValidMaxTotalSize(t *testing.T) {
37+
// Test that valid BACKING_STORE_MAX_TOTAL_SIZE value works
38+
t.Setenv("TEST_AUDIT_BACKING_STORE_MAX_TOTAL_SIZE", "1073741824") // 1GB in bytes
39+
t.Setenv("TEST_AUDIT_QUEUE_NAME", "test-queue")
40+
41+
tmpDir := t.TempDir()
42+
43+
auditor, err := NewAuditor(context.Background(), AuditorOpts{
44+
EnvPrefix: "TEST_AUDIT",
45+
BackingStorePath: tmpDir,
46+
BackingStoreMaxTotalSize: 0, // Will be read from env var
47+
Observer: Observer{
48+
TypeURI: "service/test",
49+
Name: "test-service",
50+
ID: "test-id",
51+
},
52+
Registry: prometheus.NewRegistry(),
53+
})
54+
55+
if err != nil {
56+
t.Fatalf("expected no error, got: %v", err)
57+
}
58+
if auditor == nil {
59+
t.Fatal("expected auditor to be created, got nil")
60+
}
61+
}

0 commit comments

Comments
 (0)