Skip to content
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ Main (unreleased)

- Reduce memory overhead of `prometheus.remote_write`'s WAL by bringing in an upstream change to only track series in a slice if there's a hash conflict. (@kgeckhart)

- Add `cache_minimum_ttl` argument to `faro.receiver.sourcemaps` block to optionally specify the duration after which the cache map clears itself if sourcemap was not used for the specified duration (@mateagluhak)

- Add `cache_error_cleanup_interval` argument to `faro.receiver.sourcemaps` block to specify the duration after which the cached sourcemap errors are removed from the cache (@mateagluhak)

- Add `cache_cleanup_check_interval` argument to `faro.receiver.sourcemaps` block to specify how often to check if any sourcemaps need to be cleared from the cache (@mateagluhak)

### Bugfixes

- Update `webdevops/go-common` dependency to resolve concurrent map write panic. (@dehaansa)
Expand Down
22 changes: 16 additions & 6 deletions docs/sources/reference/components/faro/faro.receiver.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,15 @@ Configuring the `rate` argument determines how fast the bucket refills, and conf
The `sourcemaps` block configures how to retrieve sourcemaps.
Sourcemaps are then used to transform file and line information from minified code into the file and line information from the original source code.

| Name | Type | Description | Default | Required |
|-------------------------|----------------|--------------------------------------------|---------|----------|
| `download` | `bool` | Whether to download sourcemaps. | `true` | no |
| `download_from_origins` | `list(string)` | Which origins to download sourcemaps from. | `["*"]` | no |
| `download_timeout` | `duration` | Timeout when downloading sourcemaps. | `"1s"` | no |
| Name | Type | Description | Default | Required |
|------------------------------------|----------------|-----------------------------------------------------------------------------------|---------|----------|
| `download` | `bool` | Whether to download sourcemaps. | `true` | no |
| `download_from_origins` | `list(string)` | Which origins to download sourcemaps from. | `["*"]` | no |
| `download_timeout` | `duration` | Timeout when downloading sourcemaps. | `"1s"` | no |
| `cache_minimum_ttl` | `duration` | Duration after which source map is deleted from cache if not used | `inf` | no |
| `cache_error_cleanup_interval` | `duration` | Duration after which the download of source map that previously failed is retried | `"1h"` | no |
| `cache_cleanup_check_interval` | `duration` | How often should cached sourcemaps be checked for cleanup | `"1h"` | no |


When exceptions are sent to the `faro.receiver` component, it can download sourcemaps from the web application.
You can disable this behavior by setting the `download` argument to `false`.
Expand All @@ -151,6 +155,12 @@ The `*` character indicates a wildcard.
By default, sourcemap downloads are subject to a timeout of `"1s"`, specified by the `download_timeout` argument.
Setting `download_timeout` to `"0s"` disables timeouts.

By default, sourcemaps are held in memory indefinitely. By setting the `cache_minimum_ttl` sourcemaps will be cleared if not used during the specified duration.

By default, if there is an error while downloading or parsing a sourcemap, error gets cached. After duration specified by `cache_error_cleanup_interval`, all errors get cleared from cache.

By default, every 30s cached sourcemaps are checked for cleanup. The frequency of checking can be modified by setting the `cache_cleanup_check_interval` argument.

To retrieve sourcemaps from disk instead of the network, specify one or more [`location` blocks][location].
When `location` blocks are provided, they're checked first for sourcemaps before falling back to downloading.

Expand Down Expand Up @@ -209,7 +219,7 @@ The template value is replaced with the release value provided by the [Faro Web
* `faro_receiver_request_message_bytes` (histogram): Size (in bytes) of HTTP requests received from clients.
* `faro_receiver_response_message_bytes` (histogram): Size (in bytes) of HTTP responses sent to clients.
* `faro_receiver_inflight_requests` (gauge): Current number of inflight requests.
* `faro_receiver_sourcemap_cache_size` (counter): Number of items in sourcemap cache per origin.
* `faro_receiver_sourcemap_cache_size` (gauge): Number of items in sourcemap cache per origin.
* `faro_receiver_sourcemap_downloads_total` (counter): Total number of sourcemap downloads performed per origin and status.
* `faro_receiver_sourcemap_file_reads_total` (counter): Total number of sourcemap retrievals using the filesystem per origin and status.

Expand Down
21 changes: 14 additions & 7 deletions internal/component/faro/receiver/arguments.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package receiver
import (
"encoding"
"fmt"
"math"
"time"

"github.com/alecthomas/units"
Expand Down Expand Up @@ -71,17 +72,23 @@ func (r *RateLimitingArguments) SetToDefault() {
// SourceMapsArguments configures how app_agent_receiver will retrieve source
// maps for transforming stack traces.
type SourceMapsArguments struct {
Download bool `alloy:"download,attr,optional"`
DownloadFromOrigins []string `alloy:"download_from_origins,attr,optional"`
DownloadTimeout time.Duration `alloy:"download_timeout,attr,optional"`
Locations []LocationArguments `alloy:"location,block,optional"`
Download bool `alloy:"download,attr,optional"`
DownloadFromOrigins []string `alloy:"download_from_origins,attr,optional"`
DownloadTimeout time.Duration `alloy:"download_timeout,attr,optional"`
CacheMinimumTtl time.Duration `alloy:"cache_minimum_ttl,attr,optional"`
CacheErrorCleanupInterval time.Duration `alloy:"cache_error_cleanup_interval,attr,optional"`
CacheCleanupCheckInterval time.Duration `alloy:"cache_cleanup_check_interval,attr,optional"`
Locations []LocationArguments `alloy:"location,block,optional"`
}

func (s *SourceMapsArguments) SetToDefault() {
*s = SourceMapsArguments{
Download: true,
DownloadFromOrigins: []string{"*"},
DownloadTimeout: time.Second,
Download: true,
DownloadFromOrigins: []string{"*"},
DownloadTimeout: time.Second,
CacheErrorCleanupInterval: time.Hour,
CacheMinimumTtl: time.Duration(math.MaxInt64),
CacheCleanupCheckInterval: time.Second * 30,
}
}

Expand Down
19 changes: 17 additions & 2 deletions internal/component/faro/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,28 @@ func (c *Component) Update(args component.Arguments) error {

c.handler.Update(newArgs.Server)

c.lazySourceMaps.SetInner(newSourceMapsStore(
innerStore := newSourceMapsStore(
log.With(c.log, "subcomponent", "handler"),
newArgs.SourceMaps,
c.sourceMapsMetrics,
nil, // Use default HTTP client.
nil, // Use default FS implementation.
))
)
c.lazySourceMaps.SetInner(innerStore)

go func(s *sourceMapsStoreImpl) {
for {
time.Sleep(newArgs.SourceMaps.CacheCleanupCheckInterval)
s.CleanOldCacheEntries()
}
}(innerStore)

go func(s *sourceMapsStoreImpl) {
for {
time.Sleep(newArgs.SourceMaps.CacheErrorCleanupInterval)
s.CleanCachedErrors()
}
}(innerStore)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would leak goroutines and memory because it's scheduled on every update without a way to exit


c.logs.SetReceivers(newArgs.Output.Logs)
c.traces.SetConsumers(newArgs.Output.Traces)
Expand Down
83 changes: 65 additions & 18 deletions internal/component/faro/receiver/sourcemaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strings"
"sync"
"text/template"
"time"

"github.com/go-kit/log"
"github.com/go-sourcemap/sourcemap"
Expand Down Expand Up @@ -67,14 +68,14 @@ func (fs osFileService) ReadFile(name string) ([]byte, error) {
}

type sourceMapMetrics struct {
cacheSize *prometheus.CounterVec
cacheSize *prometheus.GaugeVec
downloads *prometheus.CounterVec
fileReads *prometheus.CounterVec
}

func newSourceMapMetrics(reg prometheus.Registerer) *sourceMapMetrics {
m := &sourceMapMetrics{
cacheSize: prometheus.NewCounterVec(prometheus.CounterOpts{
cacheSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "faro_receiver_sourcemap_cache_size",
Help: "number of items in source map cache, per origin",
}, []string{"origin"}),
Expand All @@ -88,7 +89,7 @@ func newSourceMapMetrics(reg prometheus.Registerer) *sourceMapMetrics {
}, []string{"origin", "status"}),
}

m.cacheSize = util.MustRegisterOrGet(reg, m.cacheSize).(*prometheus.CounterVec)
m.cacheSize = util.MustRegisterOrGet(reg, m.cacheSize).(*prometheus.GaugeVec)
m.downloads = util.MustRegisterOrGet(reg, m.downloads).(*prometheus.CounterVec)
m.fileReads = util.MustRegisterOrGet(reg, m.fileReads).(*prometheus.CounterVec)
return m
Expand All @@ -99,6 +100,16 @@ type sourcemapFileLocation struct {
pathTemplate *template.Template
}

type timeSource interface {
Now() time.Time
}

type realTimeSource struct{}

func (realTimeSource) Now() time.Time {
return time.Now()
}

type sourceMapsStoreImpl struct {
log log.Logger
cli httpClient
Expand All @@ -107,8 +118,14 @@ type sourceMapsStoreImpl struct {
metrics *sourceMapMetrics
locs []*sourcemapFileLocation

cacheMut sync.Mutex
cache map[string]*sourcemap.Consumer
cacheMut sync.Mutex
cache map[string]*cachedSourceMap
timeSource timeSource
}

type cachedSourceMap struct {
consumer *sourcemap.Consumer
lastUsed time.Time
}

// newSourceMapStore creates an implementation of sourceMapsStore. The returned
Expand Down Expand Up @@ -141,27 +158,29 @@ func newSourceMapsStore(log log.Logger, args SourceMapsArguments, metrics *sourc
}

return &sourceMapsStoreImpl{
log: log,
cli: cli,
fs: fs,
args: args,
cache: make(map[string]*sourcemap.Consumer),
metrics: metrics,
locs: locs,
log: log,
cli: cli,
fs: fs,
args: args,
cache: make(map[string]*cachedSourceMap),
metrics: metrics,
locs: locs,
timeSource: realTimeSource{},
}
}

func (store *sourceMapsStoreImpl) GetSourceMap(sourceURL string, release string) (*sourcemap.Consumer, error) {
// TODO(rfratto): GetSourceMap is weak to transient errors, since it always
// caches the result, even when there's an error. This means that transient
// errors will be cached forever, preventing source maps from being retrieved.

store.cacheMut.Lock()
defer store.cacheMut.Unlock()

cacheKey := fmt.Sprintf("%s__%s", sourceURL, release)
if sm, ok := store.cache[cacheKey]; ok {
return sm, nil
if cached, ok := store.cache[cacheKey]; ok {
if cached != nil {
cached.lastUsed = store.timeSource.Now()
return cached.consumer, nil
}
return nil, nil
}

content, sourceMapURL, err := store.getSourceMapContent(sourceURL, release)
Expand All @@ -177,11 +196,39 @@ func (store *sourceMapsStoreImpl) GetSourceMap(sourceURL string, release string)
return nil, err
}
level.Info(store.log).Log("msg", "successfully parsed source map", "url", sourceMapURL, "release", release)
store.cache[cacheKey] = consumer
store.cache[cacheKey] = &cachedSourceMap{
consumer: consumer,
lastUsed: store.timeSource.Now(),
}
store.metrics.cacheSize.WithLabelValues(getOrigin(sourceURL)).Inc()
return consumer, nil
}

func (store *sourceMapsStoreImpl) CleanOldCacheEntries() {
store.cacheMut.Lock()
defer store.cacheMut.Unlock()

for key, cached := range store.cache {
if cached != nil && cached.lastUsed.Before(store.timeSource.Now().Add(-store.args.CacheMinimumTtl)) {
srcUrl := strings.SplitN(key, "__", 2)[0]
origin := getOrigin(srcUrl)
store.metrics.cacheSize.WithLabelValues(origin).Dec()
delete(store.cache, key)
}
}
}

func (store *sourceMapsStoreImpl) CleanCachedErrors() {
store.cacheMut.Lock()
defer store.cacheMut.Unlock()

for key, cached := range store.cache {
if cached == nil {
delete(store.cache, key)
}
}
}

func (store *sourceMapsStoreImpl) getSourceMapContent(sourceURL string, release string) (content []byte, sourceMapURL string, err error) {
// Attempt to find the source map in the filesystem first.
for _, loc := range store.locs {
Expand Down
Loading