Skip to content
This repository was archived by the owner on Jan 20, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions base-image/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ ENV BUILDDEPS="\
shadow \
which \
wget \
which \
vim \
git \
less \
Expand Down Expand Up @@ -48,7 +47,7 @@ RUN tdnf clean all && \

SHELL [ "/bin/bash", "-l", "-c" ]

COPY failsafe.conf entrypoint.sh Gemfile Gemfile.lock /fluentd/
COPY entrypoint.sh Gemfile Gemfile.lock /fluentd/

# Install the gems with bundler is better practice
# We need to keep this as a single layer because of the builddeps
Expand All @@ -63,6 +62,16 @@ RUN tdnf install -y $BUILDDEPS \
&& rvm install --disable-binary $RUBY_VERSION --default \
&& gem update --system --no-document \
&& gem install bundler -v '>= 2.4.15' --default --no-document \
&& rm -rf $RVM_PATH/src $RVM_PATH/examples $RVM_PATH/docs $RVM_PATH/archives \
$RUBY_PATH/lib/ruby/gems/3.*/cache $RUBY_PATH/lib/ruby/gems/3.*/doc/ \
/usr/share/doc /root/.bundle/cache \
&& rvm cleanup all \
&& gem sources --clear-all \
&& gem cleanup \
&& tdnf remove -y $BUILDDEPS \
&& tdnf clean all

RUN tdnf install -y $BUILDDEPS \
&& mkdir -p /fluentd/log /fluentd/etc /fluentd/plugins /usr/local/bundle/bin/ \
&& echo 'gem: --no-document' >> /etc/gemrc \
&& bundle config silence_root_warning true \
Expand All @@ -74,17 +83,13 @@ RUN tdnf install -y $BUILDDEPS \
&& gem sources --clear-all \
&& ln -s $(which fluentd) /usr/local/bundle/bin/fluentd \
&& gem cleanup \
&& rvm cleanup all \
&& rm -rf $RVM_PATH/src $RVM_PATH/examples $RVM_PATH/docs $RVM_PATH/archives \
$RUBY_PATH/lib/ruby/gems/3.*/cache $RUBY_PATH/lib/ruby/gems/3.*/doc/ \
/usr/share/doc /root/.bundle/cache \
## Install jemalloc
&& curl -sLo /tmp/jemalloc-5.3.0.tar.bz2 https://github.com/jemalloc/jemalloc/releases/download/5.3.0/jemalloc-5.3.0.tar.bz2 \
&& tar -C /tmp/ -xjvf /tmp/jemalloc-5.3.0.tar.bz2 \
&& cd /tmp/jemalloc-5.3.0 \
&& ./configure && make \
&& mv -v lib/libjemalloc.so* /usr/lib \
&& rm -rf /tmp/jemalloc-5.3.0 /tmp/jemalloc-5.3.0.tar.bz2 \
&& rm -rf /tmp/* \
# cleanup build deps
&& tdnf remove -y $BUILDDEPS \
&& tdnf clean all
Expand Down
4 changes: 2 additions & 2 deletions base-image/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ source "https://rubygems.org"
# pin fluentd, probably a good idea to pin all gems
gem "fluentd", "1.16.1"

gem 'oj', '3.13.23'
gem 'oj', '3.15'
gem 'ffi'
gem 'fluent-plugin-amqp', "0.14.0"
gem 'fluent-plugin-azure-loganalytics', "0.7.0"
Expand Down Expand Up @@ -42,7 +42,7 @@ gem 'fluent-plugin-systemd', "1.0.5"
gem 'fluent-plugin-uri-parser', "0.3.0"
gem 'fluent-plugin-verticajson', "0.0.6"
gem 'fluent-plugin-vmware-loginsight', "1.4.1"
gem 'fluent-plugin-vmware-log-intelligence', "2.0.6"
gem 'fluent-plugin-vmware-log-intelligence', "2.0.7"
# fluent-plugin-mysqlslowquery is dependency for fluent-plugin-vmware-log-intelligence
gem 'fluent-plugin-mysqlslowquery', "0.0.9"
gem 'fluent-plugin-throttle', '0.0.5'
Expand Down
13 changes: 6 additions & 7 deletions base-image/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ GEM
amq-protocol (2.3.2)
attr_required (1.0.1)
aws-eventstream (1.2.0)
aws-partitions (1.782.0)
aws-partitions (1.783.0)
aws-sdk-cloudwatchlogs (1.66.0)
aws-sdk-core (~> 3, >= 3.176.0)
aws-sigv4 (~> 1.1)
Expand Down Expand Up @@ -52,14 +52,14 @@ GEM
connection_pool (2.4.1)
cool.io (1.7.1)
date (3.3.3)
digest-crc (0.6.4)
digest-crc (0.6.5)
rake (>= 12.0.0, < 14.0.0)
docker-api (2.2.0)
excon (>= 0.47.0)
multi_json
domain_name (0.5.20190701)
unf (>= 0.0.5, < 1.0.0)
elastic-transport (8.2.1)
elastic-transport (8.2.2)
faraday (< 3)
multi_json
elasticsearch (8.8.0)
Expand Down Expand Up @@ -215,7 +215,7 @@ GEM
fluentd (>= 0.14.22, < 2)
json
vertica
fluent-plugin-vmware-log-intelligence (2.0.6)
fluent-plugin-vmware-log-intelligence (2.0.7)
fluent-plugin-mysqlslowquery (>= 0.0.9)
fluentd (>= 0.14.20)
http (>= 0.9.8)
Expand Down Expand Up @@ -347,7 +347,7 @@ GEM
serverengine (2.3.2)
sigdump (~> 0.2.2)
set (1.0.3)
sigdump (0.2.4)
sigdump (0.2.5)
sorted_set (1.0.3)
rbtree
set (~> 1.0)
Expand Down Expand Up @@ -389,7 +389,6 @@ GEM
PLATFORMS
ruby
x86_64-darwin-21
x86_64-darwin-22
x86_64-linux

DEPENDENCIES
Expand Down Expand Up @@ -433,7 +432,7 @@ DEPENDENCIES
fluent-plugin-throttle (= 0.0.5)
fluent-plugin-uri-parser (= 0.3.0)
fluent-plugin-verticajson (= 0.0.6)
fluent-plugin-vmware-log-intelligence (= 2.0.6)
fluent-plugin-vmware-log-intelligence (= 2.0.7)
fluent-plugin-vmware-loginsight (= 1.4.1)
fluent-plugin-webhdfs (= 1.5.0)
fluentd (= 1.16.1)
Expand Down
44 changes: 39 additions & 5 deletions config-reloader/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,15 @@ type Config struct {
AllowTagExpansion bool
AdminNamespace string
// parsed or processed/cached fields
level logrus.Level
ParsedMetaValues map[string]string
ParsedLabelSelector labels.Set
ExecTimeoutSeconds int
ReadBytesLimit int
level logrus.Level
ParsedMetaValues map[string]string
ParsedLabelSelector labels.Set
ExecTimeoutSeconds int
ReadBytesLimit int
SplitPattern string
ParsedSplitPattern []string
RemovePatterns string
ParsedRemovePatterns []string
}

var defaultConfig = &Config{
Expand All @@ -80,6 +84,8 @@ var defaultConfig = &Config{
AdminNamespace: "kube-system",
ExecTimeoutSeconds: 30,
ReadBytesLimit: 51200,
SplitPattern: "",
RemovePatterns: "",
}

var reValidID = regexp.MustCompile("([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]")
Expand Down Expand Up @@ -200,6 +206,32 @@ func (cfg *Config) Validate() error {
cfg.ParsedLabelSelector = labels.Set(parsed)
}

if cfg.SplitPattern != "" {
cfg.ParsedSplitPattern = []string{}
values := strings.Split(cfg.SplitPattern, ",")

for _, ele := range values {
if len(ele) == 0 {
// trailing or double ,,
continue
}
cfg.ParsedSplitPattern = append(cfg.ParsedSplitPattern, ele)
}
}

if cfg.RemovePatterns != "" {
cfg.ParsedRemovePatterns = []string{}
values := strings.Split(cfg.RemovePatterns, ",")

for _, ele := range values {
if len(ele) == 0 {
// trailing or double ,,
continue
}
cfg.ParsedRemovePatterns = append(cfg.ParsedRemovePatterns, ele)
}
}

return nil
}

Expand Down Expand Up @@ -255,6 +287,8 @@ func (cfg *Config) ParseFlags(args []string) error {
app.Flag("exec-timeout", "Timeout duration (in seconds) for exec command during validation").Default(strconv.Itoa(defaultConfig.ExecTimeoutSeconds)).IntVar(&cfg.ExecTimeoutSeconds)

app.Flag("container-bytes-limit", "read_bytes_limit_per_second parameter for tail plugin per container file. Default 2MB/min").Default(strconv.Itoa(defaultConfig.ReadBytesLimit)).IntVar(&cfg.ReadBytesLimit)
app.Flag("split-pattern", "List of pattern to be process without capacity limit. If empty, not pattern go to custom config").Default(defaultConfig.SplitPattern).StringVar(&cfg.SplitPattern)
app.Flag("remove-pattern", "Remove this unwanted pattern from tail for /var/log/containers/** folders").Default(defaultConfig.RemovePatterns).StringVar(&cfg.RemovePatterns)

_, err := app.Parse(args)

Expand Down
23 changes: 16 additions & 7 deletions config-reloader/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,22 +329,31 @@ func (g *generatorInstance) updateStatus(ctx context.Context, namespace string,
g.su.UpdateStatus(ctx, namespace, status)
}

func workerplus(n int) string {
return fmt.Sprintf("%d", 1+n)
}

func (g *generatorInstance) renderIncludableFile(templateFile string, dest string) (err error) {
tmpl, err := template.New(filepath.Base(templateFile)).ParseFiles(templateFile)
funcMap := template.FuncMap{"workerplus": workerplus}
tmpl, err := template.New(filepath.Base(templateFile)).Funcs(funcMap).ParseFiles(templateFile)
if err != nil {
logrus.Warnf("Error processing template file %s: %+v", templateFile, err)
return err
}

// this is the model for the includable files
model := struct {
ID string
PrometheusEnabled bool
ReadBytesLimit int
ID string
PrometheusEnabled bool
ReadBytesLimit int
ParsedSplitPattern []string
RemovePatterns []string
}{
ID: util.MakeFluentdSafeName(g.cfg.ID),
PrometheusEnabled: g.cfg.PrometheusEnabled,
ReadBytesLimit: g.cfg.ReadBytesLimit,
ID: util.MakeFluentdSafeName(g.cfg.ID),
PrometheusEnabled: g.cfg.PrometheusEnabled,
ReadBytesLimit: g.cfg.ReadBytesLimit,
ParsedSplitPattern: g.cfg.ParsedSplitPattern,
RemovePatterns: g.cfg.ParsedRemovePatterns,
}

err = util.TemplateAndWriteFile(tmpl, model, dest)
Expand Down
7 changes: 6 additions & 1 deletion config-reloader/processors/mounted_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ func (state *mountedFileState) convertToFragement(cf *ContainerFile) fluentd.Fra
makeDefaultParseDirective(),
}
}
res = append(res, dir, state.makeAttachK8sMetadataDirective(tag, mc, cf))
workerDirective := &fluentd.Directive{
Name: "worker",
Tag: "1",
Nested: []*fluentd.Directive{dir},
}
res = append(res, workerDirective, state.makeAttachK8sMetadataDirective(tag, mc, cf))
break
}
}
Expand Down
12 changes: 6 additions & 6 deletions config-reloader/processors/mounted_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestConvertToFragment(t *testing.T) {
result := state.convertToFragement(specC1)
assert.Equal(t, 2, len(result))

dir := result[0]
dir := result[0].Nested[0]

assert.Equal(t, "source", dir.Name)
assert.Equal(t, "tail", dir.Type())
Expand All @@ -240,7 +240,7 @@ func TestConvertToFragment(t *testing.T) {
result = state.convertToFragement(specC2)
assert.Equal(t, 2, len(result))

dir = result[0]
dir = result[0].Nested[0]

assert.Equal(t, "source", dir.Name)
assert.Equal(t, "tail", dir.Type())
Expand All @@ -254,7 +254,7 @@ func TestConvertToFragment(t *testing.T) {
result = state.convertToFragement(specC3)
assert.Equal(t, 2, len(result))

dir = result[0]
dir = result[0].Nested[0]

assert.Equal(t, "source", dir.Name)
assert.Equal(t, "tail", dir.Type())
Expand Down Expand Up @@ -363,9 +363,9 @@ func TestProcessMountedFile(t *testing.T) {
prep, err := Prepare(input, ctx, state)
assert.Nil(t, err)
assert.Equal(t, 6, len(prep))
assert.Equal(t, "/kubelet-root/pods/123-id/volumes/kubernetes.io~empty-dir/logs/redis.log", prep[0].Param("path"))
assert.Equal(t, "/kubelet-root/pods/abc-id/volumes/kubernetes.io~empty-dir/logs/nginx.log", prep[2].Param("path"))
assert.Equal(t, "/kubelet-root/pods/abc-sub-id/volumes/kubernetes.io~empty-dir/logs/files/nginx.log", prep[4].Param("path"))
assert.Equal(t, "/kubelet-root/pods/123-id/volumes/kubernetes.io~empty-dir/logs/redis.log", prep[0].Nested[0].Param("path"))
assert.Equal(t, "/kubelet-root/pods/abc-id/volumes/kubernetes.io~empty-dir/logs/nginx.log", prep[2].Nested[0].Param("path"))
assert.Equal(t, "/kubelet-root/pods/abc-sub-id/volumes/kubernetes.io~empty-dir/logs/files/nginx.log", prep[4].Nested[0].Param("path"))

payload := prep.String()
assert.True(t, strings.Contains(payload, "'container_image'=>'image-c2'"))
Expand Down
2 changes: 1 addition & 1 deletion config-reloader/templates/fluent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<system>
log_level {{ .FluentdLogLevel }}

workers 2
# needed to enable /api/config.reload
rpc_endpoint 127.0.0.1:24444
</system>
Expand Down
Loading