Skip to content

Commit e3f2ae6

Browse files
authored
Merge pull request #1688 from kube-logging/backpressure-isolate
isolate input sources using fixed hash tag and backpressure
2 parents db85ac6 + 73114ef commit e3f2ae6

File tree

5 files changed

+163
-61
lines changed

5 files changed

+163
-61
lines changed

config/samples/multitenant-routing/logging/tenant-infra-logging.yaml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,6 @@ spec:
5151
loggingRef: infra
5252
inputTail:
5353
storage.type: filesystem
54-
forwardOptions:
55-
Workers: 0
56-
syslogng_output:
57-
Workers: 0
5854
positiondb:
5955
hostPath:
6056
path: ""
@@ -63,6 +59,9 @@ spec:
6359
path: ""
6460
network:
6561
connectTimeout: 2
62+
metrics: {}
63+
image:
64+
tag: 2.1.8-debug
6665
---
6766
apiVersion: logging.banzaicloud.io/v1beta1
6867
kind: LoggingRoute

docs/fluentbit-flow-control.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
## Flow control with durability in a multi tenant setup
2+
3+
Resources:
4+
- https://docs.fluentbit.io/manual/administration/backpressure
5+
- https://docs.fluentbit.io/manual/administration/buffering-and-storage
6+
- https://docs.fluentbit.io/manual/pipeline/inputs/tail#sqlite-and-write-ahead-logging
7+
- https://docs.fluentbit.io/manual/administration/monitoring
8+
- https://docs.fluentbit.io/manual/administration/troubleshooting#dump-internals-signal
9+
10+
### Context
11+
12+
Let's consider we have multiple separate inputs, each sending data to their respective dedicated outputs (using tenant ids in the tags).
13+
14+
### Durability
15+
16+
According to the referenced resources we need `storage.type filesystem` for *every input*
17+
where we want to avoid losing data. If we just enable this option, there will be no limit
18+
on how many data fluent-bit should keep on disk.
19+
20+
> Note: we also have to configure the position db to avoid fluent-bit
21+
> reading the same files from the beginning after a restart
22+
23+
### Memory limit
24+
25+
The limit that is applied by default is `storage.max_chunks_up 128` on the *service* which is a global limit.
26+
But this only means, that even if fluent-bit writes all chunks to disk, there is a limit on how many
27+
chunks it can read up and handle in memory at the same time.
28+
Without any further configuration fluent-bit will write chunks to disk indefinitely and this setting will only
29+
affect the overall throughput.
30+
31+
### Disk usage limit
32+
33+
In case we want to limit the actual disk usage we need to set `storage.total_limit_size` for
34+
every *output* individually. This sounds good, but the problem with this option is that it doesn't
35+
cause any backpressure, rather just starts to discard the oldest data, which obviously results in data loss,
36+
so this option should be used with care.
37+
38+
### Backpressure
39+
40+
Backpressure can be enabled using `storage.pause_on_chunks_overlimit on` on the *input* which is great, but one important
41+
caveat again: the limit this setting considers as the trigger event is `storage.max_chunks_up` which is a global limit.
42+
43+
Going back to our main scenario, when one of the outputs is down (tenant is down), chunks for that output start to pile up
44+
on disk and in memory. When there are more than `storage.max_chunks_up` chunks in memory globally, fluent-bit pauses inputs that
45+
tries to load additional chunks. It's not clear how fluent-bit decides which output should be paused, but based on our
46+
observations (using `config/samples/multitenant-routing` for example) this works as expected as only the input that belongs
47+
to the faulty output is paused and when the output gets back online, the input is resumed immediately.
48+
49+
Also based on fluent-bit's metrics, if an output is permanently down, the chunks that are waiting for that output to be sent
50+
are not kept in memory, so other input/output pairs are not limited by the throughput.
51+
52+
In case we configure `storage.pause_on_chunks_overlimit` in the inputs we can make sure the disk usage is bounded.
53+
54+
As long as pods are not restarting, the backpressure can prevent log loss, but keep in mind, that since the input is paused,
55+
data in log files that gets deleted by the container runtime during the output's downtime will get lost.

pkg/resources/fluentbit/config.go

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -55,21 +55,14 @@ var fluentBitConfigTemplate = `
5555
{{- end }}
5656
{{- end }}
5757
58-
[INPUT]
59-
Name tail
60-
{{- range $key, $value := .Input.Values }}
61-
{{- if $value }}
62-
{{ $key }} {{$value}}
63-
{{- end }}
64-
{{- end }}
65-
{{- range $id, $v := .Input.ParserN }}
66-
{{- if $v }}
67-
Parse_{{ $id}} {{$v}}
68-
{{- end }}
69-
{{- end }}
70-
{{- if .Input.MultilineParser }}
71-
multiline.parser {{- range $i, $v := .Input.MultilineParser }}{{ if $i }},{{ end}} {{ $v }}{{ end }}
72-
{{- end }}
58+
{{- if .Inputs }}
59+
{{- range $input := .Inputs }}
60+
# Tenant: {{ $input.Tenant }}
61+
{{- template "input" $input }}
62+
{{- end }}
63+
{{- else }}
64+
{{- template "input" .Input }}
65+
{{- end }}
7366
7467
{{- if not .DisableKubernetesFilter }}
7568
[FILTER]
@@ -111,11 +104,7 @@ var fluentBitConfigTemplate = `
111104
{{- range $target := $out.Targets }}
112105
[OUTPUT]
113106
Name forward
114-
{{- if $target.AllNamespaces }}
115-
Match *
116-
{{- else }}
117-
Match_Regex {{ $target.NamespaceRegex }}
118-
{{- end }}
107+
Match {{ $target.Match }}
119108
{{- if $out.Upstream.Enabled }}
120109
Upstream {{ $out.Upstream.Config.Path }}
121110
{{- else }}
@@ -149,11 +138,7 @@ var fluentBitConfigTemplate = `
149138
{{- range $target := $out.Targets }}
150139
[OUTPUT]
151140
Name tcp
152-
{{- if $target.AllNamespaces }}
153-
Match *
154-
{{- else }}
155-
Match_Regex {{ $target.NamespaceRegex }}
156-
{{- end }}
141+
Match {{ $target.Match }}
157142
Host {{ $target.Host }}
158143
Port {{ $target.Port }}
159144
Format json_lines
@@ -203,6 +188,26 @@ var fluentbitNetworkTemplate = `
203188
{{- end }}
204189
`
205190

191+
var fluentbitInputTemplate = `
192+
{{- define "input" }}
193+
[INPUT]
194+
Name tail
195+
{{- range $key, $value := .Values }}
196+
{{- if $value }}
197+
{{ $key }} {{$value}}
198+
{{- end }}
199+
{{- end }}
200+
{{- range $id, $v := .ParserN }}
201+
{{- if $v }}
202+
Parse_{{ $id}} {{$v}}
203+
{{- end }}
204+
{{- end }}
205+
{{- if .MultilineParser }}
206+
multiline.parser {{- range $i, $v := .MultilineParser }}{{ if $i }},{{ end}} {{ $v }}{{ end }}
207+
{{- end }}
208+
{{- end }}
209+
`
210+
206211
var upstreamConfigTemplate = `
207212
[UPSTREAM]
208213
Name {{ .Config.Name }}

pkg/resources/fluentbit/configsecret.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ type fluentbitInputConfig struct {
3838
MultilineParser []string
3939
}
4040

41+
type fluentbitInputConfigWithTenant struct {
42+
Tenant string
43+
Values map[string]string
44+
ParserN []string
45+
MultilineParser []string
46+
}
47+
4148
type upstreamNode struct {
4249
Name string
4350
Host string
@@ -63,6 +70,7 @@ type fluentBitConfig struct {
6370
CoroStackSize int32
6471
Output map[string]string
6572
Input fluentbitInputConfig
73+
Inputs []fluentbitInputConfigWithTenant
6674
DisableKubernetesFilter bool
6775
KubernetesFilter map[string]string
6876
AwsFilter map[string]string
@@ -86,8 +94,8 @@ type fluentForwardOutputConfig struct {
8694
}
8795

8896
type forwardTargetConfig struct {
89-
AllNamespaces bool
9097
NamespaceRegex string
98+
Match string
9199
Host string
92100
Port int32
93101
}
@@ -373,22 +381,25 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er
373381
for _, a := range loggingResources.LoggingRoutes {
374382
tenants = append(tenants, a.Status.Tenants...)
375383
}
384+
if err := r.configureInputsForTenants(tenants, &input); err != nil {
385+
return nil, nil, errors.WrapIf(err, "configuring inputs for target tenants")
386+
}
376387
if err := r.configureOutputsForTenants(ctx, tenants, &input); err != nil {
377388
return nil, nil, errors.WrapIf(err, "configuring outputs for target tenants")
378389
}
379390
} else {
380391
// compatibility with existing configuration
381392
if input.FluentForwardOutput != nil {
382393
input.FluentForwardOutput.Targets = append(input.FluentForwardOutput.Targets, forwardTargetConfig{
383-
AllNamespaces: true,
384-
Host: input.FluentForwardOutput.TargetHost,
385-
Port: input.FluentForwardOutput.TargetPort,
394+
Match: "*",
395+
Host: input.FluentForwardOutput.TargetHost,
396+
Port: input.FluentForwardOutput.TargetPort,
386397
})
387398
} else if input.SyslogNGOutput != nil {
388399
input.SyslogNGOutput.Targets = append(input.SyslogNGOutput.Targets, forwardTargetConfig{
389-
AllNamespaces: true,
390-
Host: input.SyslogNGOutput.Host,
391-
Port: input.SyslogNGOutput.Port,
400+
Match: "*",
401+
Host: input.SyslogNGOutput.Host,
402+
Port: input.SyslogNGOutput.Port,
392403
})
393404
}
394405
}
@@ -455,6 +466,10 @@ func generateConfig(input fluentBitConfig) (string, error) {
455466
if err != nil {
456467
return "", errors.WrapIf(err, "parsing fluentbit network nested template")
457468
}
469+
tmpl, err = tmpl.Parse(fluentbitInputTemplate)
470+
if err != nil {
471+
return "", errors.WrapIf(err, "parsing fluentbit input nested template")
472+
}
458473
err = tmpl.Execute(output, input)
459474
if err != nil {
460475
return "", errors.WrapIf(err, "executing fluentbit config template")

pkg/resources/fluentbit/tenants.go

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ package fluentbit
1616

1717
import (
1818
"context"
19+
"crypto/sha256"
20+
"encoding/hex"
1921
"fmt"
2022
"sort"
2123
"strings"
2224

2325
"emperror.dev/errors"
24-
"golang.org/x/exp/slices"
26+
"golang.org/x/exp/maps"
2527
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2628
"k8s.io/apimachinery/pkg/types"
2729
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -71,31 +73,17 @@ func FindTenants(ctx context.Context, target metav1.LabelSelector, reader client
7173
}
7274
}
7375

74-
sort.Slice(tenants, func(i, j int) bool {
76+
sort.SliceStable(tenants, func(i, j int) bool {
7577
return tenants[i].Name < tenants[j].Name
7678
})
77-
// Make sure our tenant list is stable
78-
slices.SortStableFunc(tenants, func(a, b Tenant) int {
79-
if a.Name < b.Name {
80-
return -1
81-
}
82-
if a.Name == b.Name {
83-
return 0
84-
}
85-
return 1
86-
})
8779

8880
return tenants, nil
8981
}
9082

9183
func (r *Reconciler) configureOutputsForTenants(ctx context.Context, tenants []v1beta1.Tenant, input *fluentBitConfig) error {
9284
var errs error
9385
for _, t := range tenants {
94-
allNamespaces := len(t.Namespaces) == 0
95-
namespaceRegex := `.`
96-
if !allNamespaces {
97-
namespaceRegex = fmt.Sprintf("^[^_]+_(%s)_", strings.Join(t.Namespaces, "|"))
98-
}
86+
match := fmt.Sprintf("kubernetes.%s.*", hashFromTenantName(t.Name))
9987
logging := &v1beta1.Logging{}
10088
if err := r.resourceReconciler.Client.Get(ctx, types.NamespacedName{Name: t.Name}, logging); err != nil {
10189
return errors.WrapIf(err, "getting logging resource")
@@ -113,24 +101,64 @@ func (r *Reconciler) configureOutputsForTenants(ctx context.Context, tenants []v
113101
input.FluentForwardOutput = &fluentForwardOutputConfig{}
114102
}
115103
input.FluentForwardOutput.Targets = append(input.FluentForwardOutput.Targets, forwardTargetConfig{
116-
AllNamespaces: allNamespaces,
117-
NamespaceRegex: namespaceRegex,
118-
Host: aggregatorEndpoint(logging, fluentd.ServiceName),
119-
Port: fluentd.ServicePort,
104+
Match: match,
105+
Host: aggregatorEndpoint(logging, fluentd.ServiceName),
106+
Port: fluentd.ServicePort,
120107
})
121108
} else if _, syslogNGSPec := loggingResources.GetSyslogNGSpec(); syslogNGSPec != nil {
122109
if input.SyslogNGOutput == nil {
123110
input.SyslogNGOutput = newSyslogNGOutputConfig()
124111
}
125112
input.SyslogNGOutput.Targets = append(input.SyslogNGOutput.Targets, forwardTargetConfig{
126-
AllNamespaces: allNamespaces,
127-
NamespaceRegex: namespaceRegex,
128-
Host: aggregatorEndpoint(logging, syslogng.ServiceName),
129-
Port: syslogng.ServicePort,
113+
Match: match,
114+
Host: aggregatorEndpoint(logging, syslogng.ServiceName),
115+
Port: syslogng.ServicePort,
130116
})
131117
} else {
132118
errs = errors.Append(errs, errors.Errorf("logging %s does not provide any aggregator configured", t.Name))
133119
}
134120
}
135121
return errs
136122
}
123+
124+
func (r *Reconciler) configureInputsForTenants(tenants []v1beta1.Tenant, input *fluentBitConfig) error {
125+
var errs error
126+
for _, t := range tenants {
127+
allNamespaces := len(t.Namespaces) == 0
128+
tenantValues := maps.Clone(input.Input.Values)
129+
if !allNamespaces {
130+
var paths []string
131+
for _, n := range t.Namespaces {
132+
paths = append(paths, fmt.Sprintf("/var/log/containers/*_%s_*.log", n))
133+
}
134+
tenantValues["Path"] = strings.Join(paths, ",")
135+
} else {
136+
tenantValues["Path"] = "/var/log/containers/*.log"
137+
}
138+
139+
tenantValues["DB"] = fmt.Sprintf("/tail-db/tail-containers-state-%s.db", t.Name)
140+
tenantValues["Tag"] = fmt.Sprintf("kubernetes.%s.*", hashFromTenantName(t.Name))
141+
// This helps to make sure we apply backpressure on the input, see https://docs.fluentbit.io/manual/administration/backpressure
142+
tenantValues["storage.pause_on_chunks_overlimit"] = "on"
143+
input.Inputs = append(input.Inputs, fluentbitInputConfigWithTenant{
144+
Tenant: t.Name,
145+
Values: tenantValues,
146+
ParserN: input.Input.ParserN,
147+
MultilineParser: input.Input.MultilineParser,
148+
})
149+
}
150+
// the regex will work only if we cut the prefix off. fluentbit doesn't care about the content, just the length
151+
input.KubernetesFilter["Kube_Tag_Prefix"] = `kubernetes.0000000000.var.log.containers.`
152+
return errs
153+
}
154+
155+
func hashFromTenantName(input string) string {
156+
hasher := sha256.New()
157+
hasher.Write([]byte(input))
158+
hashBytes := hasher.Sum(nil)
159+
160+
// Convert the hash to a hex string
161+
hashString := hex.EncodeToString(hashBytes)
162+
163+
return hashString[0:10]
164+
}

0 commit comments

Comments
 (0)