diff --git a/.gitignore b/.gitignore index 6f6ae30..37a849c 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,4 @@ Gemfile.lock .bundle vendor -derby.log +derby.log \ No newline at end of file diff --git a/README.md b/README.md index fefad1c..040c0fc 100755 --- a/README.md +++ b/README.md @@ -97,6 +97,13 @@ Reading data from MySQL: jdbc_password => "password" # or jdbc_password_filepath => "/path/to/my/password_file" statement => "SELECT ..." + use_column_value => true + tracking_column => tracking_number + last_run_storage => "zookeeper" + # or last_run_storage => "file" + # last_run_metadata_path => "/path/to/last_run_metadata_path" + last_run_zookeeper_path => "/last_run_zookeeper_path" + zk_ip_list => "zookeeper_host:zookeeper_port" jdbc_paging_enabled => "true" jdbc_page_size => "50000" } diff --git a/lib/logstash/inputs/jdbc.rb b/lib/logstash/inputs/jdbc.rb index aa47c0f..487b86d 100755 --- a/lib/logstash/inputs/jdbc.rb +++ b/lib/logstash/inputs/jdbc.rb @@ -157,9 +157,21 @@ module LogStash module Inputs class Jdbc < LogStash::Inputs::Base # exactly once. config :schedule, :validate => :string + # last run time storage ('file', 'zookeeper') + config :last_run_storage, :validate => :string, :default => "file" + # Path to file with last run time config :last_run_metadata_path, :validate => :string, :default => "#{ENV['HOME']}/.logstash_jdbc_last_run" + # Path to zookeeper node with last run time + config :last_run_zookeeper_path, :validate => :string, :default => "/logstash_input_jdbc_last_run" + + # Zookeeper ip list + config :zk_ip_list, :validate => :string, :default => "localhost:2181" + + # Znode we created is permanent or ephemeral. + config :zk_ephemeral, :validate => :boolean, :default => false + # Use an incremental column value rather than a timestamp config :use_column_value, :validate => :boolean, :default => false @@ -213,7 +225,8 @@ def register end end - set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self)) + init_value_tracker + set_statement_logger(LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(@logger)) @enable_encoding = !@charset.nil? || !@columns_charset.empty? @@ -242,6 +255,16 @@ def register end end # def register + def init_value_tracker + if @last_run_storage.downcase == "file" + set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self)) + else + if @last_run_storage.downcase == "zookeeper" + set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTrackingZookeeper.build_last_value_tracker(self)) + end + end + end + # test injection points def set_statement_logger(instance) @statement_logger = instance @@ -273,7 +296,14 @@ def stop def execute_query(queue) # update default parameters - @parameters['sql_last_value'] = @value_tracker.value + if @last_run_storage.downcase == "file" + @parameters['sql_last_value'] = @value_tracker.value + else + if @last_run_storage.downcase == "zookeeper" + @parameters['sql_last_value'] = @value_tracker.read_value + end + end + @event_sent = false execute_statement(@statement, @parameters) do |row| if enable_encoding? ## do the necessary conversions to string elements @@ -282,8 +312,15 @@ def execute_query(queue) event = LogStash::Event.new(row) decorate(event) queue << event + @event_sent = true + end + begin + # save value if it's not the same as previous + @value_tracker.write if @parameters['sql_last_value'] != @value_tracker.value && @event_sent + rescue => e + @logger.error("Failed to write last value", :exception => e) + stop end - @value_tracker.write end private diff --git a/lib/logstash/plugin_mixins/jdbc/jdbc.rb b/lib/logstash/plugin_mixins/jdbc/jdbc.rb index da524ee..cfb4bb8 100644 --- a/lib/logstash/plugin_mixins/jdbc/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc/jdbc.rb @@ -4,6 +4,7 @@ require "time" require "date" require_relative "value_tracking" +require_relative "value_tracking_zookeeper" require_relative "checked_count_logger" java_import java.util.concurrent.locks.ReentrantLock diff --git a/lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb b/lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb new file mode 100644 index 0000000..b4e317b --- /dev/null +++ b/lib/logstash/plugin_mixins/jdbc/value_tracking_zookeeper.rb @@ -0,0 +1,149 @@ +# encoding: utf-8 +require "zk" + +module LogStash module PluginMixins module Jdbc + class ValueTrackingZookeeper + + def self.build_last_value_tracker(plugin) + if plugin.use_column_value && plugin.tracking_column_type == "numeric" + # use this irrespective of the jdbc_default_timezone setting + klass = NumericValueTrackerZK + else + if plugin.jdbc_default_timezone.nil? || plugin.jdbc_default_timezone.empty? + # no TZ stuff for Sequel, use Time + klass = TimeValueTrackerZK + else + # Sequel does timezone handling on DateTime only + klass = DateTimeValueTrackerZK + end + end + + handler = NullNodeHandler.new(plugin.last_run_zookeeper_path) + if plugin.record_last_run + handler = NodeHandler.new(plugin) + end + if plugin.clean_run + handler.clean + end + instance = klass.new(handler) + return instance + end + + attr_reader :value + + def initialize(handler) + @node_handler = handler + set_value(read_value) + end + + def read_value + # override in subclass + end + + def set_value(value) + # override in subclass + end + + def write + @node_handler.write(@value) + end + end + + + class NumericValueTrackerZK < ValueTrackingZookeeper + def read_value + @val = @node_handler.read + return 0 if @val.nil? + @val.to_f.round(0) + end + + def set_value(value) + return unless value.is_a?(Numeric) + @value = value + end + end + + class DateTimeValueTrackerZK < ValueTrackingZookeeper + def read_value + @node_handler.read || DateTime.new(1970) + end + + def set_value(value) + if value.respond_to?(:to_datetime) + @value = value.to_datetime + else + @value = DateTime.parse(value) + end + end + end + + class TimeValueTrackerZK < ValueTrackingZookeeper + def read_value + @node_handler.read || Time.at(0).utc + end + + def set_value(value) + if value.respond_to?(:to_time) + @value = value.to_time + else + @value = DateTime.parse(value).to_time + end + end + end + + class NodeHandler + def initialize(plugin) + @path = plugin.last_run_zookeeper_path + @zk_ip_list = plugin.zk_ip_list + @zk_ephemeral = plugin.zk_ephemeral + + @zk = ZK.new(@zk_ip_list) + @exists = @zk.exists?(@path) + create_node + end + + def clean + return unless @exists + @zk.delete(@path) + @exists = false + end + + def read + return unless @exists + @zk.get(@path).first + end + + def set_initial(initial) + @initial = initial + end + + def create_node + unless @exists + if @zk_ephemeral + @zk.create(@path, :ephemeral => true) + else + @zk.create(@path) + end + @exists = true + end + end + + def write(value) + @zk.set(@path, value.to_s) + end + end + + class NullNodeHandler + def initialize(path) + end + + def clean + end + + def read + end + + def write(value) + end + end +end end end diff --git a/logstash-input-jdbc.gemspec b/logstash-input-jdbc.gemspec index 6d3a13e..e544484 100755 --- a/logstash-input-jdbc.gemspec +++ b/logstash-input-jdbc.gemspec @@ -24,6 +24,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'tzinfo' s.add_runtime_dependency 'tzinfo-data' s.add_runtime_dependency 'rufus-scheduler' + s.add_runtime_dependency "zk", ">= 1.9.6" s.add_development_dependency 'logstash-devutils' s.add_development_dependency 'timecop'