From efffa06ccc67b9c93ff43394f23ef9b35a887a51 Mon Sep 17 00:00:00 2001 From: IldarMinaev Date: Mon, 7 Aug 2017 13:04:33 +0400 Subject: [PATCH] Fix 'Add_field' issue --- lib/logstash/filters/metrics.rb | 150 ++++++++++++++++++++++++++++++-- 1 file changed, 145 insertions(+), 5 deletions(-) diff --git a/lib/logstash/filters/metrics.rb b/lib/logstash/filters/metrics.rb index c5e4980..74d1ec0 100644 --- a/lib/logstash/filters/metrics.rb +++ b/lib/logstash/filters/metrics.rb @@ -2,6 +2,7 @@ require "securerandom" require "logstash/filters/base" require "logstash/namespace" +require "thread" # The metrics filter is useful for aggregating metrics. # @@ -165,10 +166,29 @@ def register end @metric_meters = ThreadSafe::Cache.new { |h,k| h[k] = Metriks.meter metric_key(k) } @metric_timers = ThreadSafe::Cache.new { |h,k| h[k] = Metriks.timer metric_key(k) } + #Minaev Ildar - this is patch to 'add_field' and 'add_tag" work properly + @metric_fields = ThreadSafe::Cache.new { |h,k| h[k] = Hash.new metric_key(k)} + @metric_tags = ThreadSafe::Cache.new { ThreadSafe::Array.new } + @metric_add_fields = ThreadSafe::Cache.new + @metric_add_tags = ThreadSafe::Array.new + @semaphore = Mutex.new + @add_field.each do |field, value| + @metric_add_fields[field]=value + end + @add_field = {} + + @add_tag.each do |tag| + @metric_add_tags << tag + end + @add_tag = [] end # def register def filter(event) + return unless filter?(event) + #Minaev Ildar:if type is empty - the event is generated by our flush method, so skip it! + return unless event.get("type") + # TODO(piavlo): This should probably be moved to base filter class. if @ignore_older_than > 0 && Time.now - event.timestamp.time > @ignore_older_than @@ -178,10 +198,64 @@ def filter(event) @meter.each do |m| @metric_meters[event.sprintf(m)].mark + @metric_tags[event.sprintf(m)].clear + @metric_add_tags.each do |tag| + new_tag = event.sprintf(tag) + @metric_tags[event.sprintf(m)] = @metric_tags[event.sprintf(m)].push(new_tag) unless @metric_tags[event.sprintf(m)].include?(new_tag) + end + + new_event = LogStash::Event.new + @semaphore.synchronize { + @metric_fields[event.sprintf(m)].clear + @metric_add_fields.each_pair do |field, value| + new_field = event.sprintf(field) + new_value = [value] if !value.is_a?(Array) + new_value.each do |v| + v = event.sprintf(v) + if event.include?(new_field) && (event.get(new_field) != v) + #new_event.set(new_field, [event.get(new_field)] if !event.get(new_field).is_a?(Array)) + if !event.get(new_field).is_a?(Array) + new_event.set(new_field, [event.get(new_field)]) + end + new_event.set(new_field, new_event.get(new_field) << v) + else + new_event.set(new_field,v) + end + @metric_fields[event.sprintf(m)][new_field] = new_event.get(new_field) + end + end + } end @timer.each do |name, value| @metric_timers[event.sprintf(name)].update(event.sprintf(value).to_f) + @metric_tags[event.sprintf(name)].clear + @metric_add_tags.each do |tag| + new_tag = event.sprintf(tag) + @metric_tags[event.sprintf(name)] = @metric_tags[event.sprintf(name)].push(new_tag) unless @metric_tags[event.sprintf(name)].include?(new_tag) + end + + new_event = LogStash::Event.new + @semaphore.synchronize { + @metric_fields[event.sprintf(name)].clear + @metric_add_fields.each_pair do |field, value| + new_field = event.sprintf(field) + new_value = [value] if !value.is_a?(Array) + new_value.each do |v| + v = event.sprintf(v) + if event.include?(new_field) && (event.get(new_field) != v) + #new_event.set(new_field, [event.get(new_field)] if !event.get(new_field).is_a?(Array)) + if !event.get(new_field).is_a?(Array) + new_event.set(new_field, [event.get(new_field)]) + end + new_event.set(new_field, new_event.get(new_field) << v) + else + new_event.set(new_field,v) + end + @metric_fields[event.sprintf(name)][new_field] = new_event.get(new_field) + end + end + } end end # def filter @@ -194,14 +268,47 @@ def flush(options = {}) # Do nothing if there's nothing to do ;) return unless should_flush? - event = LogStash::Event.new - event.set("message", @host) + events = [] + @metric_meters.each_pair do |name, metric| + event = LogStash::Event.new + event.set("message", @host) flush_rates event, name, metric metric.clear if should_clear? + @semaphore.synchronize { + @metric_fields[name].each_pair do |field, value| + field=event.sprintf(field) + val = [value] if !value.is_a?(Array) + val.each do |v| + if event.include?(v) && (event.get(field) != v) + if !event.get(v).is_a?(Array) + event.set(field, event.get(v)) + else + event.set(field, event.get(field) << v) + end + else + event.set(field,v) + end + end + end + } + @metric_tags[name].each do |tag| + #event.set("tags", event.get("tags") ||= []) + if !event.get("tags") || event.get("tags").empty? + event.set("tags", []) + end + #event.set("tags", event.get("tags").push(tag) unless event.get("tags").include?(tag)) + if !event.get("tags").include?(tag) + event.set("tags", event.get("tags").push(tag)) + end + end + filter_matched(event) + events << event end @metric_timers.each_pair do |name, metric| + event = LogStash::Event.new + event.set("message", @host) flush_rates event, name, metric # These 4 values are not sliding, so they probably are not useful. event.set("[#{name}][min]", metric.min) @@ -214,6 +321,36 @@ def flush(options = {}) event.set("[#{name}][p#{percentile}]", metric.snapshot.value(percentile / 100.0)) end metric.clear if should_clear? + @semaphore.synchronize { + @metric_fields[name].each_pair do |field, value| + field=event.sprintf(field) + val = [value] if !value.is_a?(Array) + val.each do |v| + if event.include?(v) && (event.get(field) != v) + if !event.get(v).is_a?(Array) + event.set(field, event.get(v)) + else + event.set(field, event.get(field) << v) + end + else + event.set(field,v) + end + end + end + } + @metric_tags[name].each do |tag| + #event.set("tags", event.get("tags") ||= []) + if !event.get("tags") || event.get("tags").empty? + event.set("tags", []) + end + #event.set("tags", event.get("tags").push(tag) unless event.get("tags").include?(tag)) + if !event.get("tags").include?(tag) + event.set("tags", event.get("tags").push(tag)) + end + end + filter_matched(event) + + events << event end # Reset counter since metrics were flushed @@ -224,10 +361,13 @@ def flush(options = {}) @last_clear.value = 0 @metric_meters.clear @metric_timers.clear + @semaphore.synchronize { + @metric_fields.clear + } + @metric_tags.clear end - filter_matched(event) - return [event] + return events end # this is a temporary fix to enable periodic flushes without using the plugin config: @@ -259,4 +399,4 @@ def should_flush? def should_clear? @clear_interval > 0 && @last_clear.value >= @clear_interval end -end # class LogStash::Filters::Metrics +end # class LogStash::Filters::Metrics \ No newline at end of file