Skip to content
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ v1.11.0-rc.0
- Update the `prometheus.exporter.process` component to get the `remove_empty_groups` option. (@dehaansa)

- Remove unnecessary allocations in `stage.static_labels`. (@kalleep)

- Add `cache_minimum_ttl` argument to `faro.receiver.sourcemaps` block to optionally specify the duration after which the cache map clears itself if sourcemap was not used for the specified duration (@mateagluhak)

- Add `cache_error_cleanup_interval` argument to `faro.receiver.sourcemaps` block to specify the duration after which the cached sourcemap errors are removed from the cache (@mateagluhak)

- Add `cache_cleanup_check_interval` argument to `faro.receiver.sourcemaps` block to specify how often to check if any sourcemaps need to be cleared from the cache (@mateagluhak)

- Upgrade `beyla.ebpf` from Beyla version v2.2.5 to v2.5.8 The full list of changes can be found in the [Beyla release notes](https://github.com/grafana/beyla/releases/tag/v2.5.2) (@marctc)

Expand Down
25 changes: 19 additions & 6 deletions docs/sources/reference/components/faro/faro.receiver.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,15 @@ Configuring the `rate` argument determines how fast the bucket refills, and conf
The `sourcemaps` block configures how to retrieve sourcemaps.
Sourcemaps are then used to transform file and line information from minified code into the file and line information from the original source code.

| Name | Type | Description | Default | Required |
|-------------------------|----------------|--------------------------------------------|---------|----------|
| `download` | `bool` | Whether to download sourcemaps. | `true` | no |
| `download_from_origins` | `list(string)` | Which origins to download sourcemaps from. | `["*"]` | no |
| `download_timeout` | `duration` | Timeout when downloading sourcemaps. | `"1s"` | no |
| Name | Type | Description | Default | Required |
| ------------------------------ | -------------- | --------------------------------------------------------------------------------- | ------- | -------- |
| `cache_cleanup_check_interval` | `duration` | How often should cached sourcemaps be checked for cleanup | `"1h"` | no |
| `cache_error_cleanup_interval` | `duration` | Duration after which the download of source map that previously failed is retried | `"1h"` | no |
| `cache_minimum_ttl` | `duration` | Duration after which source map is deleted from cache if not used | `inf` | no |
| `download_from_origins` | `list(string)` | Which origins to download sourcemaps from. | `["*"]` | no |
| `download_timeout` | `duration` | Timeout when downloading sourcemaps. | `"1s"` | no |
| `download` | `bool` | Whether to download sourcemaps. | `true` | no |


When exceptions are sent to the `faro.receiver` component, it can download sourcemaps from the web application.
You can disable this behavior by setting the `download` argument to `false`.
Expand All @@ -151,6 +155,15 @@ The `*` character indicates a wildcard.
By default, sourcemap downloads are subject to a timeout of `"1s"`, specified by the `download_timeout` argument.
Setting `download_timeout` to `"0s"` disables timeouts.

By default, sourcemaps are held in memory indefinitely.
You can set `cache_minimum_ttl` to clear sourcemaps that aren't used during the specified duration.

By default, if there's an error while downloading or parsing a sourcemap, the error is cached.
After the duration specified by `cache_error_cleanup_interval`, all errors are cleared from the cache.

By default, cached sourcemaps are checked for cleanup every 30 seconds.
You can modify the frequency by setting the `cache_cleanup_check_interval` argument.

To retrieve sourcemaps from disk instead of the network, specify one or more [`location` blocks][location].
When `location` blocks are provided, they're checked first for sourcemaps before falling back to downloading.

Expand Down Expand Up @@ -209,7 +222,7 @@ The template value is replaced with the release value provided by the [Faro Web
* `faro_receiver_request_message_bytes` (histogram): Size (in bytes) of HTTP requests received from clients.
* `faro_receiver_response_message_bytes` (histogram): Size (in bytes) of HTTP responses sent to clients.
* `faro_receiver_inflight_requests` (gauge): Current number of inflight requests.
* `faro_receiver_sourcemap_cache_size` (counter): Number of items in sourcemap cache per origin.
* `faro_receiver_sourcemap_cache_size` (gauge): Number of items in sourcemap cache per origin.
* `faro_receiver_sourcemap_downloads_total` (counter): Total number of sourcemap downloads performed per origin and status.
* `faro_receiver_sourcemap_file_reads_total` (counter): Total number of sourcemap retrievals using the filesystem per origin and status.

Expand Down
21 changes: 14 additions & 7 deletions internal/component/faro/receiver/arguments.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package receiver
import (
"encoding"
"fmt"
"math"
"time"

"github.com/alecthomas/units"
Expand Down Expand Up @@ -71,17 +72,23 @@ func (r *RateLimitingArguments) SetToDefault() {
// SourceMapsArguments configures how app_agent_receiver will retrieve source
// maps for transforming stack traces.
type SourceMapsArguments struct {
Download bool `alloy:"download,attr,optional"`
DownloadFromOrigins []string `alloy:"download_from_origins,attr,optional"`
DownloadTimeout time.Duration `alloy:"download_timeout,attr,optional"`
Locations []LocationArguments `alloy:"location,block,optional"`
Download bool `alloy:"download,attr,optional"`
DownloadFromOrigins []string `alloy:"download_from_origins,attr,optional"`
DownloadTimeout time.Duration `alloy:"download_timeout,attr,optional"`
CacheMinimumTtl time.Duration `alloy:"cache_minimum_ttl,attr,optional"`
CacheErrorCleanupInterval time.Duration `alloy:"cache_error_cleanup_interval,attr,optional"`
CacheCleanupCheckInterval time.Duration `alloy:"cache_cleanup_check_interval,attr,optional"`
Locations []LocationArguments `alloy:"location,block,optional"`
}

func (s *SourceMapsArguments) SetToDefault() {
*s = SourceMapsArguments{
Download: true,
DownloadFromOrigins: []string{"*"},
DownloadTimeout: time.Second,
Download: true,
DownloadFromOrigins: []string{"*"},
DownloadTimeout: time.Second,
CacheErrorCleanupInterval: time.Hour,
CacheMinimumTtl: time.Duration(math.MaxInt64),
CacheCleanupCheckInterval: time.Second * 30,
}
}

Expand Down
19 changes: 17 additions & 2 deletions internal/component/faro/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,28 @@ func (c *Component) Update(args component.Arguments) error {

c.handler.Update(newArgs.Server)

c.lazySourceMaps.SetInner(newSourceMapsStore(
innerStore := newSourceMapsStore(
log.With(c.log, "subcomponent", "handler"),
newArgs.SourceMaps,
c.sourceMapsMetrics,
nil, // Use default HTTP client.
nil, // Use default FS implementation.
))
)
c.lazySourceMaps.SetInner(innerStore)

go func(s *sourceMapsStoreImpl) {
for {
time.Sleep(newArgs.SourceMaps.CacheCleanupCheckInterval)
s.CleanOldCacheEntries()
}
}(innerStore)

go func(s *sourceMapsStoreImpl) {
for {
time.Sleep(newArgs.SourceMaps.CacheErrorCleanupInterval)
s.CleanCachedErrors()
}
}(innerStore)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would leak goroutines and memory because it's scheduled on every update without a way to exit


c.logs.SetReceivers(newArgs.Output.Logs)
c.traces.SetConsumers(newArgs.Output.Traces)
Expand Down
83 changes: 65 additions & 18 deletions internal/component/faro/receiver/sourcemaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strings"
"sync"
"text/template"
"time"

"github.com/go-kit/log"
"github.com/go-sourcemap/sourcemap"
Expand Down Expand Up @@ -67,14 +68,14 @@ func (fs osFileService) ReadFile(name string) ([]byte, error) {
}

type sourceMapMetrics struct {
cacheSize *prometheus.CounterVec
cacheSize *prometheus.GaugeVec
downloads *prometheus.CounterVec
fileReads *prometheus.CounterVec
}

func newSourceMapMetrics(reg prometheus.Registerer) *sourceMapMetrics {
m := &sourceMapMetrics{
cacheSize: prometheus.NewCounterVec(prometheus.CounterOpts{
cacheSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "faro_receiver_sourcemap_cache_size",
Help: "number of items in source map cache, per origin",
}, []string{"origin"}),
Expand All @@ -88,7 +89,7 @@ func newSourceMapMetrics(reg prometheus.Registerer) *sourceMapMetrics {
}, []string{"origin", "status"}),
}

m.cacheSize = util.MustRegisterOrGet(reg, m.cacheSize).(*prometheus.CounterVec)
m.cacheSize = util.MustRegisterOrGet(reg, m.cacheSize).(*prometheus.GaugeVec)
m.downloads = util.MustRegisterOrGet(reg, m.downloads).(*prometheus.CounterVec)
m.fileReads = util.MustRegisterOrGet(reg, m.fileReads).(*prometheus.CounterVec)
return m
Expand All @@ -99,6 +100,16 @@ type sourcemapFileLocation struct {
pathTemplate *template.Template
}

type timeSource interface {
Now() time.Time
}

type realTimeSource struct{}

func (realTimeSource) Now() time.Time {
return time.Now()
}

type sourceMapsStoreImpl struct {
log log.Logger
cli httpClient
Expand All @@ -107,8 +118,14 @@ type sourceMapsStoreImpl struct {
metrics *sourceMapMetrics
locs []*sourcemapFileLocation

cacheMut sync.Mutex
cache map[string]*sourcemap.Consumer
cacheMut sync.Mutex
cache map[string]*cachedSourceMap
timeSource timeSource
}

type cachedSourceMap struct {
consumer *sourcemap.Consumer
lastUsed time.Time
}

// newSourceMapStore creates an implementation of sourceMapsStore. The returned
Expand Down Expand Up @@ -141,27 +158,29 @@ func newSourceMapsStore(log log.Logger, args SourceMapsArguments, metrics *sourc
}

return &sourceMapsStoreImpl{
log: log,
cli: cli,
fs: fs,
args: args,
cache: make(map[string]*sourcemap.Consumer),
metrics: metrics,
locs: locs,
log: log,
cli: cli,
fs: fs,
args: args,
cache: make(map[string]*cachedSourceMap),
metrics: metrics,
locs: locs,
timeSource: realTimeSource{},
}
}

func (store *sourceMapsStoreImpl) GetSourceMap(sourceURL string, release string) (*sourcemap.Consumer, error) {
// TODO(rfratto): GetSourceMap is weak to transient errors, since it always
// caches the result, even when there's an error. This means that transient
// errors will be cached forever, preventing source maps from being retrieved.

store.cacheMut.Lock()
defer store.cacheMut.Unlock()

cacheKey := fmt.Sprintf("%s__%s", sourceURL, release)
if sm, ok := store.cache[cacheKey]; ok {
return sm, nil
if cached, ok := store.cache[cacheKey]; ok {
if cached != nil {
cached.lastUsed = store.timeSource.Now()
return cached.consumer, nil
}
return nil, nil
}

content, sourceMapURL, err := store.getSourceMapContent(sourceURL, release)
Expand All @@ -177,11 +196,39 @@ func (store *sourceMapsStoreImpl) GetSourceMap(sourceURL string, release string)
return nil, err
}
level.Info(store.log).Log("msg", "successfully parsed source map", "url", sourceMapURL, "release", release)
store.cache[cacheKey] = consumer
store.cache[cacheKey] = &cachedSourceMap{
consumer: consumer,
lastUsed: store.timeSource.Now(),
}
store.metrics.cacheSize.WithLabelValues(getOrigin(sourceURL)).Inc()
return consumer, nil
}

func (store *sourceMapsStoreImpl) CleanOldCacheEntries() {
store.cacheMut.Lock()
defer store.cacheMut.Unlock()

for key, cached := range store.cache {
if cached != nil && cached.lastUsed.Before(store.timeSource.Now().Add(-store.args.CacheMinimumTtl)) {
srcUrl := strings.SplitN(key, "__", 2)[0]
origin := getOrigin(srcUrl)
store.metrics.cacheSize.WithLabelValues(origin).Dec()
delete(store.cache, key)
}
}
}

func (store *sourceMapsStoreImpl) CleanCachedErrors() {
store.cacheMut.Lock()
defer store.cacheMut.Unlock()

for key, cached := range store.cache {
if cached == nil {
delete(store.cache, key)
}
}
}

func (store *sourceMapsStoreImpl) getSourceMapContent(sourceURL string, release string) (content []byte, sourceMapURL string, err error) {
// Attempt to find the source map in the filesystem first.
for _, loc := range store.locs {
Expand Down
Loading