Skip to content
This repository was archived by the owner on Dec 15, 2021. It is now read-only.

Commit f1c3b6a

Browse files
author
Ahmed Ammar
committed
Update to support logstash 2.0.
1 parent 63c4dc9 commit f1c3b6a

File tree

3 files changed

+42
-26
lines changed

3 files changed

+42
-26
lines changed

lib/logstash-input-dynamodb_jars.rb

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
require_jar( 'com.amazonaws', 'aws-java-sdk-codedeploy', '1.10.11' )
1616
require_jar( 'com.amazonaws', 'aws-java-sdk-dynamodb', '1.10.10' )
1717
require_jar( 'com.amazonaws', 'aws-java-sdk-directconnect', '1.10.11' )
18-
require_jar( 'org.apache.httpcomponents', 'httpclient', '4.3.6' )
18+
require_jar( 'org.apache.httpcomponents', 'httpclient', '4.4.1' )
19+
require_jar( 'org.apache.httpcomponents', 'httpcore', '4.4.1' )
1920
require_jar( 'com.amazonaws', 'aws-java-sdk-sns', '1.10.11' )
2021
require_jar( 'com.amazonaws', 'aws-java-sdk-directory', '1.10.11' )
2122
require_jar( 'com.google.protobuf', 'protobuf-java', '2.6.1' )

lib/logstash/inputs/dynamodb.rb

+33-20
Original file line numberDiff line numberDiff line change
@@ -182,17 +182,20 @@ def register
182182

183183
public
184184
def run(logstash_queue)
185-
begin
186-
run_with_catch(logstash_queue)
187-
rescue LogStash::ShutdownSignal
188-
exit_threads
189-
until @queue.empty?
190-
@logger.info("Flushing rest of events in logstash queue")
191-
event = @queue.pop()
192-
queue_event(@parser.parse_stream(event), logstash_queue, @host)
193-
end # until [email protected]?
194-
end # begin
195-
end # def run(logstash_queue)
185+
$exit = false
186+
run_with_catch(logstash_queue)
187+
end
188+
189+
public
190+
def stop
191+
$exit = true
192+
exit_threads
193+
until @queue.empty?
194+
@logger.info("Flushing rest of events in logstash queue")
195+
event = @queue.pop()
196+
queue_event(@parser.parse_stream(event), logstash_queue, @host)
197+
end # until [email protected]?
198+
end
196199

197200
# Starts KCL app in a background thread
198201
# Starts parallel scan if need be in a background thread
@@ -278,12 +281,16 @@ def scan(logstash_queue)
278281
start_table_copy_thread
279282

280283
scan_queue = @logstash_writer.getQueue()
281-
while true
282-
event = scan_queue.take()
283-
if event.getEntry().nil? and event.getSize() == -1
284-
break
285-
end # if event.isEmpty()
286-
queue_event(@parser.parse_scan(event.getEntry(), event.getSize()), logstash_queue, @host)
284+
while !$exit
285+
if !scan_queue.empty?
286+
event = scan_queue.take()
287+
if event.getEntry().nil? and event.getSize() == -1
288+
break
289+
end # if event.isEmpty()
290+
queue_event(@parser.parse_scan(event.getEntry(), event.getSize()), logstash_queue, @host)
291+
else
292+
sleep(0.01)
293+
end
287294
end # while true
288295
end
289296

@@ -292,14 +299,20 @@ def stream(logstash_queue)
292299
@logger.info("Starting stream...")
293300
start_kcl_thread
294301

295-
while true
296-
event = @queue.pop()
297-
queue_event(@parser.parse_stream(event), logstash_queue, @host)
302+
while !$exit
303+
if !@queue.empty?
304+
event = @queue.pop()
305+
queue_event(@parser.parse_stream(event), logstash_queue, @host)
306+
else
307+
sleep(0.01)
308+
end
298309
end # while true
299310
end
300311

301312
private
302313
def exit_threads
314+
@worker.shutdown()
315+
303316
unless @dynamodb_scan_thread.nil?
304317
@dynamodb_scan_thread.exit
305318
end # unless @dynamodb_scan_thread.nil?

logstash-input-dynamodb.gemspec

+7-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-input-dynamodb'
3-
s.version = '1.0.0'
3+
s.version = '1.0.1'
44
s.licenses = ['Apache License (2.0)']
55
s.summary = "This input plugin scans a specified DynamoDB table and then reads changes to a DynamoDB table from the associated DynamoDB Stream."
66
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
@@ -19,9 +19,11 @@ Gem::Specification.new do |s|
1919
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "input" }
2020

2121
# Gem dependencies
22-
s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 2.0.0'
23-
s.add_runtime_dependency "logstash-codec-json"
22+
s.add_runtime_dependency "logstash-core", ">= 2.0.0", "< 3.0.0"
23+
s.add_runtime_dependency 'logstash-codec-json'
24+
s.add_runtime_dependency 'stud', '>= 0.0.22'
2425
s.add_runtime_dependency "activesupport-json_encoder"
26+
s.add_development_dependency 'logstash-devutils', '>= 0.0.16'
2527
# Jar dependencies
2628
s.requirements << "jar 'com.amazonaws:aws-java-sdk-elasticbeanstalk', '1.10.11'"
2729
s.requirements << "jar 'com.amazonaws:aws-java-sdk-ses', '1.10.11' "
@@ -37,7 +39,8 @@ Gem::Specification.new do |s|
3739
s.requirements << "jar 'com.amazonaws:aws-java-sdk-codedeploy', '1.10.11'"
3840
s.requirements << "jar 'com.amazonaws:aws-java-sdk-dynamodb', '1.10.10'"
3941
s.requirements << "jar 'com.amazonaws:aws-java-sdk-directconnect', '1.10.11'"
40-
s.requirements << "jar 'org.apache.httpcomponents:httpclient', '4.3.6'"
42+
s.requirements << "jar 'org.apache.httpcomponents:httpclient', '4.4.1'"
43+
s.requirements << "jar 'org.apache.httpcomponents:httpcore', '4.4.1'"
4144
s.requirements << "jar 'com.amazonaws:aws-java-sdk-sns', '1.10.11'"
4245
s.requirements << "jar 'com.amazonaws:aws-java-sdk-directory', '1.10.11'"
4346
s.requirements << "jar 'com.google.protobuf:protobuf-java', '2.6.1'"
@@ -95,6 +98,5 @@ Gem::Specification.new do |s|
9598
s.requirements << "jar 'com.amazonaws:aws-java-sdk-cloudwatch', '1.10.8'"
9699
s.add_runtime_dependency 'jar-dependencies'
97100
# Development dependencies
98-
s.add_development_dependency "logstash-devutils"
99101
s.add_development_dependency "mocha"
100102
end

0 commit comments

Comments
 (0)