Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 145 additions & 5 deletions lib/logstash/filters/metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require "securerandom"
require "logstash/filters/base"
require "logstash/namespace"
require "thread"

# The metrics filter is useful for aggregating metrics.
#
Expand Down Expand Up @@ -165,10 +166,29 @@ def register
end
@metric_meters = ThreadSafe::Cache.new { |h,k| h[k] = Metriks.meter metric_key(k) }
@metric_timers = ThreadSafe::Cache.new { |h,k| h[k] = Metriks.timer metric_key(k) }
#Minaev Ildar - this is patch to 'add_field' and 'add_tag" work properly
@metric_fields = ThreadSafe::Cache.new { |h,k| h[k] = Hash.new metric_key(k)}
@metric_tags = ThreadSafe::Cache.new { ThreadSafe::Array.new }
@metric_add_fields = ThreadSafe::Cache.new
@metric_add_tags = ThreadSafe::Array.new
@semaphore = Mutex.new
@add_field.each do |field, value|
@metric_add_fields[field]=value
end
@add_field = {}

@add_tag.each do |tag|
@metric_add_tags << tag
end
@add_tag = []
end # def register

def filter(event)

return unless filter?(event)
#Minaev Ildar:if type is empty - the event is generated by our flush method, so skip it!
return unless event.get("type")


# TODO(piavlo): This should probably be moved to base filter class.
if @ignore_older_than > 0 && Time.now - event.timestamp.time > @ignore_older_than
Expand All @@ -178,10 +198,64 @@ def filter(event)

@meter.each do |m|
@metric_meters[event.sprintf(m)].mark
@metric_tags[event.sprintf(m)].clear
@metric_add_tags.each do |tag|
new_tag = event.sprintf(tag)
@metric_tags[event.sprintf(m)] = @metric_tags[event.sprintf(m)].push(new_tag) unless @metric_tags[event.sprintf(m)].include?(new_tag)
end

new_event = LogStash::Event.new
@semaphore.synchronize {
@metric_fields[event.sprintf(m)].clear
@metric_add_fields.each_pair do |field, value|
new_field = event.sprintf(field)
new_value = [value] if !value.is_a?(Array)
new_value.each do |v|
v = event.sprintf(v)
if event.include?(new_field) && (event.get(new_field) != v)
#new_event.set(new_field, [event.get(new_field)] if !event.get(new_field).is_a?(Array))
if !event.get(new_field).is_a?(Array)
new_event.set(new_field, [event.get(new_field)])
end
new_event.set(new_field, new_event.get(new_field) << v)
else
new_event.set(new_field,v)
end
@metric_fields[event.sprintf(m)][new_field] = new_event.get(new_field)
end
end
}
end

@timer.each do |name, value|
@metric_timers[event.sprintf(name)].update(event.sprintf(value).to_f)
@metric_tags[event.sprintf(name)].clear
@metric_add_tags.each do |tag|
new_tag = event.sprintf(tag)
@metric_tags[event.sprintf(name)] = @metric_tags[event.sprintf(name)].push(new_tag) unless @metric_tags[event.sprintf(name)].include?(new_tag)
end

new_event = LogStash::Event.new
@semaphore.synchronize {
@metric_fields[event.sprintf(name)].clear
@metric_add_fields.each_pair do |field, value|
new_field = event.sprintf(field)
new_value = [value] if !value.is_a?(Array)
new_value.each do |v|
v = event.sprintf(v)
if event.include?(new_field) && (event.get(new_field) != v)
#new_event.set(new_field, [event.get(new_field)] if !event.get(new_field).is_a?(Array))
if !event.get(new_field).is_a?(Array)
new_event.set(new_field, [event.get(new_field)])
end
new_event.set(new_field, new_event.get(new_field) << v)
else
new_event.set(new_field,v)
end
@metric_fields[event.sprintf(name)][new_field] = new_event.get(new_field)
end
end
}
end
end # def filter

Expand All @@ -194,14 +268,47 @@ def flush(options = {})
# Do nothing if there's nothing to do ;)
return unless should_flush?

event = LogStash::Event.new
event.set("message", @host)
events = []

@metric_meters.each_pair do |name, metric|
event = LogStash::Event.new
event.set("message", @host)
flush_rates event, name, metric
metric.clear if should_clear?
@semaphore.synchronize {
@metric_fields[name].each_pair do |field, value|
field=event.sprintf(field)
val = [value] if !value.is_a?(Array)
val.each do |v|
if event.include?(v) && (event.get(field) != v)
if !event.get(v).is_a?(Array)
event.set(field, event.get(v))
else
event.set(field, event.get(field) << v)
end
else
event.set(field,v)
end
end
end
}
@metric_tags[name].each do |tag|
#event.set("tags", event.get("tags") ||= [])
if !event.get("tags") || event.get("tags").empty?
event.set("tags", [])
end
#event.set("tags", event.get("tags").push(tag) unless event.get("tags").include?(tag))
if !event.get("tags").include?(tag)
event.set("tags", event.get("tags").push(tag))
end
end
filter_matched(event)
events << event
end

@metric_timers.each_pair do |name, metric|
event = LogStash::Event.new
event.set("message", @host)
flush_rates event, name, metric
# These 4 values are not sliding, so they probably are not useful.
event.set("[#{name}][min]", metric.min)
Expand All @@ -214,6 +321,36 @@ def flush(options = {})
event.set("[#{name}][p#{percentile}]", metric.snapshot.value(percentile / 100.0))
end
metric.clear if should_clear?
@semaphore.synchronize {
@metric_fields[name].each_pair do |field, value|
field=event.sprintf(field)
val = [value] if !value.is_a?(Array)
val.each do |v|
if event.include?(v) && (event.get(field) != v)
if !event.get(v).is_a?(Array)
event.set(field, event.get(v))
else
event.set(field, event.get(field) << v)
end
else
event.set(field,v)
end
end
end
}
@metric_tags[name].each do |tag|
#event.set("tags", event.get("tags") ||= [])
if !event.get("tags") || event.get("tags").empty?
event.set("tags", [])
end
#event.set("tags", event.get("tags").push(tag) unless event.get("tags").include?(tag))
if !event.get("tags").include?(tag)
event.set("tags", event.get("tags").push(tag))
end
end
filter_matched(event)

events << event
end

# Reset counter since metrics were flushed
Expand All @@ -224,10 +361,13 @@ def flush(options = {})
@last_clear.value = 0
@metric_meters.clear
@metric_timers.clear
@semaphore.synchronize {
@metric_fields.clear
}
@metric_tags.clear
end

filter_matched(event)
return [event]
return events
end

# this is a temporary fix to enable periodic flushes without using the plugin config:
Expand Down Expand Up @@ -259,4 +399,4 @@ def should_flush?
def should_clear?
@clear_interval > 0 && @last_clear.value >= @clear_interval
end
end # class LogStash::Filters::Metrics
end # class LogStash::Filters::Metrics