Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gbuteyko/agent super queue #1647

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 34 additions & 8 deletions cmd/statshouse/statshouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func runMain() int {
printVerbUsage()
return 1
}
var mappingCacheSize int64
var mappingCacheTTL int
// Motivation - some engine infrastructure cannot add options without dash. so wi allow both
// $> statshouse agent -a -b -c
// and
Expand Down Expand Up @@ -144,10 +146,14 @@ func runMain() int {
argvAddCommonFlags()
argvAddAgentFlags(false)
build.FlagParseShowVersionHelp()
mappingCacheSize = argv.configAgent.MappingCacheSize
mappingCacheTTL = argv.configAgent.MappingCacheTTL
case "aggregator":
argvAddCommonFlags()
argvAddAggregatorFlags(false)
build.FlagParseShowVersionHelp()
mappingCacheSize = argv.configAggregator.MappingCacheSize
mappingCacheTTL = argv.configAggregator.MappingCacheTTL
case "ingress_proxy":
argvAddCommonFlags()
argvAddAgentFlags(false)
Expand All @@ -157,6 +163,8 @@ func runMain() int {
flag.StringVar(&argv.configAggregator.MetadataNet, "metadata-net", aggregator.DefaultConfigAggregator().MetadataNet, "")
argv.configAgent = agent.DefaultConfig()
build.FlagParseShowVersionHelp()
mappingCacheSize = argv.configAgent.MappingCacheSize
mappingCacheTTL = argv.configAgent.MappingCacheTTL
case "tag_mapping":
mainTagMapping()
return 0
Expand Down Expand Up @@ -201,6 +209,7 @@ func runMain() int {
argv.cacheDir = filepath.Dir(argv.diskCacheFilename)
}
var dc *pcache.DiskCache // We support working without touching disk (on readonly filesystems, in stateless containers, etc)
var fpmc *os.File
if argv.cacheDir != "" {
var err error
if dc, err = pcache.OpenDiskCache(filepath.Join(argv.cacheDir, "mapping_cache.sqlite3"), pcache.DefaultTxDuration); err != nil {
Expand All @@ -213,7 +222,16 @@ func runMain() int {
// logErr.Printf("failed to close disk cache: %v", err)
// }
// }()

// we do not want to confuse mappings from different clusters, this would be a disaster
fpmc, err = os.OpenFile(filepath.Join(argv.cacheDir, fmt.Sprintf("mappings-%s.cache", argv.cluster)), os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
logErr.Printf("failed to open agent mappings cache: %v", err)
return 1
}
defer fpmc.Close()
}
mappingsCache := pcache.LoadMappingsCacheFile(fpmc, mappingCacheSize, mappingCacheTTL)

argv.configAgent.AggregatorAddresses = strings.Split(argv.aggAddr, ",")

Expand All @@ -232,22 +250,22 @@ func runMain() int {
logErr.Printf("-agg-addr must contain comma-separated list of 3 aggregators (1 shard is recommended)")
return 1
}
mainAgent(aesPwd, dc)
mainAgent(aesPwd, dc, mappingsCache)
case "aggregator":
mainAggregator(aesPwd, dc)
mainAggregator(aesPwd, dc, mappingsCache)
case "ingress_proxy":
if len(argv.configAgent.AggregatorAddresses) != 3 {
logErr.Printf("-agg-addr must contain comma-separated list of 3 aggregators (1 shard is recommended)")
return 1
}
mainIngressProxy(aesPwd)
mainIngressProxy(aesPwd, mappingsCache)
default:
logErr.Printf("Wrong command line verb or -new-conveyor argument %q, see --help for valid values", verb)
}
return 0
}

func mainAgent(aesPwd string, dc *pcache.DiskCache) int {
func mainAgent(aesPwd string, dc *pcache.DiskCache, mcagent *pcache.MappingsCache) int {
startDiscCacheTime := time.Now() // we only have disk cache before. Be carefull when redesigning
argv.configAgent.Cluster = argv.cluster
if err := argv.configAgent.ValidateConfigSource(); err != nil {
Expand Down Expand Up @@ -282,6 +300,7 @@ func mainAgent(aesPwd string, dc *pcache.DiskCache) int {
format.TagValueIDComponentAgent,
metricStorage,
dc,
mcagent,
log.Printf,
func(a *agent.Agent, unixNow uint32) {
k := data_model.Key{
Expand Down Expand Up @@ -550,13 +569,16 @@ loop:
logOk.Printf("7. Waiting preprocessor to save %d buckets of historic data...", nonEmpty)
sh2.WaitPreprocessor()
shutdownInfo.StopPreprocessor = shutdownInfoDuration(&now).Nanoseconds()
logOk.Printf("8. Saving mappings...")
_ = mcagent.Save()
shutdownInfo.SaveMappings = shutdownInfoDuration(&now).Nanoseconds()
shutdownInfo.FinishShutdownTime = now.UnixNano()
shutdownInfoSave(argv.cacheDir, shutdownInfo)
logOk.Printf("Bye")
return 0
}

func mainAggregator(aesPwd string, dc *pcache.DiskCache) int {
func mainAggregator(aesPwd string, dc *pcache.DiskCache, mappingsCache *pcache.MappingsCache) int {
startDiscCacheTime := time.Now() // we only have disk cache before. Be carefull when redesigning
if err := aggregator.ValidateConfigAggregator(argv.configAggregator); err != nil {
logErr.Printf("%s", err)
Expand All @@ -569,7 +591,7 @@ func mainAggregator(aesPwd string, dc *pcache.DiskCache) int {
logErr.Printf("--agg-addr to listen must be specified")
return 1
}
agg, err := aggregator.MakeAggregator(dc, argv.cacheDir, argv.aggAddr, aesPwd, argv.configAggregator, argv.customHostName, argv.logLevel == "trace")
agg, err := aggregator.MakeAggregator(dc, mappingsCache, argv.cacheDir, argv.aggAddr, aesPwd, argv.configAggregator, argv.customHostName, argv.logLevel == "trace")
if err != nil {
logErr.Printf("%v", err)
return 1
Expand Down Expand Up @@ -606,13 +628,16 @@ loop:
logOk.Printf("4. Waiting RPC clients to receive responses and disconnect...")
agg.WaitRPCServer(10 * time.Second)
shutdownInfo.StopRPCServer = shutdownInfoDuration(&now).Nanoseconds()
logOk.Printf("5. Saving mappings...")
_ = mappingsCache.Save()
shutdownInfo.SaveMappings = shutdownInfoDuration(&now).Nanoseconds()
shutdownInfo.FinishShutdownTime = now.UnixNano()
shutdownInfoSave(argv.cacheDir, shutdownInfo)
logOk.Printf("Bye")
return 0
}

func mainIngressProxy(aesPwd string) {
func mainIngressProxy(aesPwd string, mappingsCache *pcache.MappingsCache) {
// Ensure proxy configuration is valid
config := argv.configIngress
config.Network = "tcp"
Expand All @@ -634,7 +659,7 @@ func mainIngressProxy(aesPwd string) {
ctx, cancel := context.WithCancel(context.Background())
exit := make(chan error, 1)
go func() {
exit <- aggregator.RunIngressProxy2(ctx, config, aesPwd)
exit <- aggregator.RunIngressProxy2(ctx, config, aesPwd, mappingsCache)
}()
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, syscall.SIGINT)
Expand All @@ -650,6 +675,7 @@ func mainIngressProxy(aesPwd string) {
logErr.Println(err)
cancel()
}
_ = mappingsCache.Save()
}

func listen(network, address string) net.Listener {
Expand Down
23 changes: 14 additions & 9 deletions cmd/statshouse/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,28 @@ func startWorker(sh2 *agent.Agent, metricStorage *metajournal.MetricsStorage, pm
return w
}

// func (w *worker) wait() {
// w.mapper.Stop()
// }

func (w *worker) HandleMetrics(args data_model.HandlerArgs) (h data_model.MappedMetricHeader, done bool) {
if w.logPackets != nil {
w.logPackets("Parsed metric: %s\n", args.MetricBytes.String())
}
w.fillTime(args, &h)
metaOk := w.fillMetricMeta(args, &h)
if metaOk {
done = w.mapper.Map(args, h.MetricMeta, &h)
} else {
w.mapper.MapEnvironment(args.MetricBytes, &h)
newConveyor := w.sh2.UseNewConveyor() || (h.MetricMeta != nil && h.MetricMeta.PipelineVersion == 3)
if newConveyor {
if metaOk {
w.sh2.Map(args, &h, w.autoCreate)
} else {
w.sh2.MapEnvironment(args.MetricBytes, &h)
}
done = true
} else {
if metaOk {
done = w.mapper.Map(args, h.MetricMeta, &h)
} else {
w.mapper.MapEnvironment(args.MetricBytes, &h)
done = true
}
}

if done {
if w.logPackets != nil {
w.printMetric("cached", *args.MetricBytes, h)
Expand Down
14 changes: 13 additions & 1 deletion internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,14 @@ type Agent struct {
statshouseRemoteConfigString string // optimization
skipShards atomic.Int32 // copy from config.
builtinNewSharding atomic.Bool // copy from config.
builtinNewConveyor atomic.Bool // copy from config.

rUsage syscall.Rusage // accessed without lock by first shard addBuiltIns
heartBeatEventType int32 // first time "start", then "heartbeat"
heartBeatSecondBucket int // random [0..59] bucket for per minute heartbeat to spread load on aggregator
startTimestamp uint32

mappingsCache *pcache.MappingsCache
metricStorage format.MetaStorageInterface

componentTag int32 // agent or ingress proxy or aggregator (they have agents for built-in metrics)
Expand Down Expand Up @@ -107,7 +109,8 @@ type Agent struct {
}

// All shard aggregators must be on the same network
func MakeAgent(network string, storageDir string, aesPwd string, config Config, hostName string, componentTag int32, metricStorage format.MetaStorageInterface, dc *pcache.DiskCache, logF func(format string, args ...interface{}),
func MakeAgent(network string, storageDir string, aesPwd string, config Config, hostName string, componentTag int32, metricStorage format.MetaStorageInterface,
dc *pcache.DiskCache, mappingsCache *pcache.MappingsCache, logF func(format string, args ...interface{}),
beforeFlushBucketFunc func(s *Agent, nowUnix uint32), getConfigResult *tlstatshouse.GetConfigResult, envLoader *env.Loader) (*Agent, error) {
newClient := func() *rpc.Client {
return rpc.NewClient(
Expand Down Expand Up @@ -139,6 +142,7 @@ func MakeAgent(network string, storageDir string, aesPwd string, config Config,
args: string(format.ForceValidStringValue(allArgs)), // if single arg is too big, it is truncated here
logF: logF,
buildArchTag: format.GetBuildArchKey(runtime.GOARCH),
mappingsCache: mappingsCache,
metricStorage: metricStorage,
beforeFlushBucketFunc: beforeFlushBucketFunc,
envLoader: envLoader,
Expand Down Expand Up @@ -428,6 +432,7 @@ func (s *Agent) updateConfigRemotelyExperimental() {
s.skipShards.Store(0)
}
s.builtinNewSharding.Store(config.BuiltinNewSharding)
s.builtinNewConveyor.Store(config.BuiltinNewConveyor)
for _, shard := range s.Shards {
shard.mu.Lock()
shard.config = config
Expand All @@ -440,6 +445,9 @@ func (s *Agent) updateConfigRemotelyExperimental() {
shardReplica.config = config
shardReplica.mu.Unlock()
}
if s.componentTag != format.TagValueIDComponentAggregator { // aggregator has its own separate remote config for cache sizes
s.mappingsCache.SetSizeTTL(config.MappingCacheSize, config.MappingCacheTTL)
}
}

func (s *Agent) goFlusher(cancelFlushCtx context.Context, wg *sync.WaitGroup) {
Expand Down Expand Up @@ -508,6 +516,10 @@ func (s *Agent) CreateBuiltInItemValue(key *data_model.Key, meta *format.MetricM
return shard.CreateBuiltInItemValue(key)
}

func (s *Agent) UseNewConveyor() bool {
return s.builtinNewConveyor.Load()
}

func (s *Agent) ApplyMetric(m tlstatshouse.MetricBytes, h data_model.MappedMetricHeader, ingestionStatusOKTag int32) {
start := time.Now()
// Simply writing everything we know about metric ingestion errors would easily double how much metrics data we write
Expand Down
90 changes: 90 additions & 0 deletions internal/agent/agent_mapping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2022 V Kontakte LLC
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

package agent

import (
"github.com/vkcom/statshouse/internal/data_model"
"github.com/vkcom/statshouse/internal/data_model/gen2/tl"
"github.com/vkcom/statshouse/internal/data_model/gen2/tlstatshouse"
"github.com/vkcom/statshouse/internal/format"
"go4.org/mem"
)

func (s *Agent) Map(args data_model.HandlerArgs, h *data_model.MappedMetricHeader, autoCreate *data_model.AutoCreate) {
s.mapAllTags(h, args.MetricBytes, autoCreate)
if h.IngestionStatus != 0 {
return
}
// validate values only if metric is valid
h.IngestionStatus = data_model.ValidateMetricData(args.MetricBytes)
h.ValuesChecked = true // not used in v3, just to avoid confusion
}

// mapAllTags processes all tags in a single pass, including environment tag
// unlike v2, it doesn't stop on the first invalid tag
func (s *Agent) mapAllTags(h *data_model.MappedMetricHeader, metric *tlstatshouse.MetricBytes, autoCreate *data_model.AutoCreate) {
for i := 0; i < len(metric.Tags); i++ {
v := &metric.Tags[i]
tagMeta, tagIDKey, valid := data_model.ValidateTag(v, metric, h, autoCreate)
if !valid {
continue
}
if tagIDKey == 0 { // that tag is not in metric meta
continue
}
switch {
case tagMeta.SkipMapping:
h.SetSTag(tagMeta.Index, string(v.Value), tagIDKey)
case tagMeta.Index == format.StringTopTagIndex:
h.SValue = v.Value
if h.IsSKeySet {
h.TagSetTwiceKey = tagIDKey
}
h.IsSKeySet = true
case len(v.Value) == 0: // this case is also valid for raw values
h.SetTag(tagMeta.Index, 0, tagIDKey) // we interpret "1" => "vasya", "1" => "petya" as second one overriding the first, but generating a warning
case tagMeta.Raw:
id, ok := format.ContainsRawTagValue(mem.B(v.Value))
if !ok {
h.InvalidRawValue = v.Value
h.InvalidRawTagKey = tagIDKey
continue
}
h.SetTag(tagMeta.Index, id, tagIDKey)
default:
id, found := s.mappingsCache.GetValueBytes(uint32(h.ReceiveTime.Unix()), v.Value)
if found {
h.SetTag(tagMeta.Index, id, tagIDKey)
} else {
h.SetSTag(tagMeta.Index, string(v.Value), tagIDKey)
}
}
}
}

func (s *Agent) mapEnvironmentTag(h *data_model.MappedMetricHeader, v *tl.DictionaryFieldStringBytes) {
var err error
v.Value, err = format.AppendValidStringValue(v.Value[:0], v.Value)
if err != nil || len(v.Value) == 0 {
return
}
id, found := s.mappingsCache.GetValueBytes(uint32(h.ReceiveTime.Unix()), v.Value)
if found {
h.Key.Tags[0] = id
}
}

// Subset of Map which only maps environment and produces no errors. Used to report environment of not found metrics.
func (s *Agent) MapEnvironment(metric *tlstatshouse.MetricBytes, h *data_model.MappedMetricHeader) {
for _, v := range metric.Tags {
if string(v.Key) != format.EnvTagID {
continue
}
s.mapEnvironmentTag(h, &v)
return
}
}
10 changes: 10 additions & 0 deletions internal/agent/agent_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,16 @@ func (s *Shard) addBuiltInsLocked() {
v.value = data_model.ItemValue{} // simply reset Counter, even if somehow <0
v.mu.Unlock()
}
elements, sumSize, averageTS, adds, evicts, timestampUpdates, timestampUpdateSkips := s.agent.mappingsCache.Stats()
if elements > 0 {
getMultiItem(s.CurrentTime, format.BuiltinMetricIDMappingCacheElements, [16]int32{0, s.agent.componentTag}).Tail.AddValueCounterHost(s.rng, float64(elements), 1, 0)
getMultiItem(s.CurrentTime, format.BuiltinMetricIDMappingCacheSize, [16]int32{0, s.agent.componentTag}).Tail.AddValueCounterHost(s.rng, float64(sumSize), 1, 0)
getMultiItem(s.CurrentTime, format.BuiltinMetricIDMappingCacheAverageTTL, [16]int32{0, s.agent.componentTag}).Tail.AddValueCounterHost(s.rng, float64(s.CurrentTime)-float64(averageTS), 1, 0)
getMultiItem(s.CurrentTime, format.BuiltinMetricIDMappingCacheEvent, [16]int32{0, s.agent.componentTag, format.TagValueIDMappingCacheEventAdd}).Tail.AddCounterHost(s.rng, float64(adds), 0)
getMultiItem(s.CurrentTime, format.BuiltinMetricIDMappingCacheEvent, [16]int32{0, s.agent.componentTag, format.TagValueIDMappingCacheEventEvict}).Tail.AddCounterHost(s.rng, float64(evicts), 0)
getMultiItem(s.CurrentTime, format.BuiltinMetricIDMappingCacheEvent, [16]int32{0, s.agent.componentTag, format.TagValueIDMappingCacheEventTimestampUpdate}).Tail.AddCounterHost(s.rng, float64(timestampUpdates), 0)
getMultiItem(s.CurrentTime, format.BuiltinMetricIDMappingCacheEvent, [16]int32{0, s.agent.componentTag, format.TagValueIDMappingCacheEventTimestampUpdateSkip}).Tail.AddCounterHost(s.rng, float64(timestampUpdateSkips), 0)
}

sizeMem := s.HistoricBucketsDataSize
sizeDiskTotal, sizeDiskUnsent := s.HistoricBucketsDataSizeDisk()
Expand Down
6 changes: 5 additions & 1 deletion internal/agent/agent_shard_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (s *Shard) sampleBucket(bucket *data_model.MetricsBucket, rnd *rand.Rand) (
}

func (s *Shard) sendToSenders(bucket *data_model.MetricsBucket, sampleFactors []tlstatshouse.SampleFactor) {
version := uint8(2) // TODO - change version to 3 after all aggregators updated
version := uint8(3)
data, err := s.compressBucket(bucket, sampleFactors, version)
cbd := compressedBucketData{time: bucket.Time, data: data, version: version} // No id as not saved to disk yet
if err != nil {
Expand Down Expand Up @@ -443,6 +443,7 @@ func (s *Shard) sendRecent(cancelCtx context.Context, cbd compressedBucketData,
err, cbd.time, s.ShardKey)
return false
}
s.agent.mappingsCache.AddValues(cbd.time, respV3.Mappings)
if len(respV3.Warning) != 0 {
s.agent.logF("Send Warning: %s, moving bucket %d to historic conveyor for shard %d",
respV3.Warning, cbd.time, s.ShardKey)
Expand Down Expand Up @@ -560,6 +561,9 @@ func (s *Shard) sendHistoric(cancelCtx context.Context, cbd compressedBucketData
if len(respV3.Warning) != 0 {
s.agent.logF("Send historic bucket returned: %s", respV3.Warning)
}
// we choose to use bucket time here, because we do not map new strings for historic buckets
// at the moment of sending, if they were not mapped at the moment of saving
s.agent.mappingsCache.AddValues(cbd.time, respV3.Mappings)
if !respV3.IsSetDiscard() {
shardReplica.stats.historicSendKeep.Add(1)
select {
Expand Down
Loading
Loading