Skip to content
This repository was archived by the owner on Dec 15, 2021. It is now read-only.

Commit 7837c27

Browse files
author
Brandon Tom
committed
added more elastic changes. Moved around some classes. Changed require_jar to all pull from single file
1 parent f43499b commit 7837c27

9 files changed

+249
-251
lines changed

lib/logstash/inputs/DynamoDBLogParser.rb

+125-123
Original file line numberDiff line numberDiff line change
@@ -19,146 +19,148 @@
1919
require 'bigdecimal'
2020
require 'activesupport/json_encoder'
2121
require 'base64'
22-
begin
23-
require 'jar-dependencies'
24-
require_jar( 'com.amazonaws', 'aws-java-sdk-dynamodb', '1.10.10' )
25-
require_jar( 'com.amazonaws', 'dynamodb-import-export-tool', '1.0.0' )
26-
end
22+
23+
require "logstash-input-dynamodb_jars"
2724
java_import "com.fasterxml.jackson.databind.ObjectMapper"
2825
java_import "com.amazonaws.services.dynamodbv2.model.AttributeValue"
2926
java_import "com.amazonaws.dynamodb.bootstrap.AttributeValueMixIn"
3027

31-
class DynamoDBLogParser
28+
module Logstash
29+
module Inputs
30+
module DynamoDB
31+
class DynamoDBLogParser
3232

33-
MAX_NUMBER_OF_BYTES_FOR_NUMBER = 21;
33+
MAX_NUMBER_OF_BYTES_FOR_NUMBER = 21;
3434

35-
def initialize(view_type, log_format, key_schema, region)
36-
@view_type = view_type
37-
@log_format = log_format
38-
@mapper ||= ObjectMapper.new()
39-
@mapper.setSerializationInclusion(JsonInclude::Include::NON_NULL)
40-
@mapper.addMixInAnnotations(AttributeValue, AttributeValueMixIn);
41-
@key_schema = key_schema
42-
ActiveSupport.encode_big_decimal_as_string = false
43-
@hash_template = Hash.new
44-
@hash_template["eventID"] = "0"
45-
@hash_template["eventName"] = "INSERT"
46-
@hash_template["eventVersion"] = "1.0"
47-
@hash_template["eventSource"] = "aws:dynamodb"
48-
@hash_template["awsRegion"] = region
49-
end
35+
def initialize(view_type, log_format, key_schema, region)
36+
@view_type = view_type
37+
@log_format = log_format
38+
@mapper ||= ObjectMapper.new()
39+
@mapper.setSerializationInclusion(JsonInclude::Include::NON_NULL)
40+
@mapper.addMixInAnnotations(AttributeValue, AttributeValueMixIn);
41+
@key_schema = key_schema
42+
ActiveSupport.encode_big_decimal_as_string = false
43+
@hash_template = Hash.new
44+
@hash_template["eventID"] = "0"
45+
@hash_template["eventName"] = "INSERT"
46+
@hash_template["eventVersion"] = "1.0"
47+
@hash_template["eventSource"] = "aws:dynamodb"
48+
@hash_template["awsRegion"] = region
49+
end
5050

51-
public
52-
def parse_scan(log, new_image_size)
53-
data_hash = JSON.parse(@mapper.writeValueAsString(log))
51+
public
52+
def parse_scan(log, new_image_size)
53+
data_hash = JSON.parse(@mapper.writeValueAsString(log))
5454

55-
@hash_template["dynamodb"] = Hash.new
56-
@hash_template["dynamodb"]["keys"] = Hash.new
57-
size_bytes = calculate_key_size_in_bytes(log)
58-
@key_schema.each { |x|
59-
@hash_template["dynamodb"]["keys"][x] = data_hash[x]
60-
}
61-
unless @view_type == "keys_only"
62-
size_bytes += new_image_size
63-
@hash_template["dynamodb"]["newImage"] = data_hash
64-
end
65-
@hash_template["dynamodb"]["sequenceNumber"] = "0"
66-
@hash_template["dynamodb"]["sizeBytes"] = size_bytes
67-
@hash_template["dynamodb"]["streamViewType"] = @view_type.upcase
55+
@hash_template["dynamodb"] = Hash.new
56+
@hash_template["dynamodb"]["keys"] = Hash.new
57+
size_bytes = calculate_key_size_in_bytes(log)
58+
@key_schema.each { |x|
59+
@hash_template["dynamodb"]["keys"][x] = data_hash[x]
60+
}
61+
unless @view_type == "keys_only"
62+
size_bytes += new_image_size
63+
@hash_template["dynamodb"]["newImage"] = data_hash
64+
end
65+
@hash_template["dynamodb"]["sequenceNumber"] = "0"
66+
@hash_template["dynamodb"]["sizeBytes"] = size_bytes
67+
@hash_template["dynamodb"]["streamViewType"] = @view_type.upcase
6868

69-
return parse_view_type(@hash_template)
70-
end
69+
return parse_view_type(@hash_template)
70+
end
7171

72-
public
73-
def parse_stream(log)
74-
return parse_view_type(JSON.parse(@mapper.writeValueAsString(log))["internalObject"])
75-
end
72+
public
73+
def parse_stream(log)
74+
return parse_view_type(JSON.parse(@mapper.writeValueAsString(log))["internalObject"])
75+
end
7676

77-
private
78-
def calculate_key_size_in_bytes(record)
79-
key_size = 0
80-
@key_schema.each { |x|
81-
key_size += x.length
82-
value = record.get(x)
83-
if not value.getB().nil?
84-
b = value.getB();
85-
key_size += Base64.decode64(b).length
86-
elsif not value.getS().nil?
87-
s = value.getS();
88-
key_size += s.length;
89-
elsif not value.getN().nil?
90-
key_size += MAX_NUMBER_OF_BYTES_FOR_NUMBER;
91-
end
92-
}
93-
return key_size
94-
end
77+
private
78+
def calculate_key_size_in_bytes(record)
79+
key_size = 0
80+
@key_schema.each { |x|
81+
key_size += x.length
82+
value = record.get(x)
83+
if !(value.getB().nil?)
84+
b = value.getB();
85+
key_size += Base64.decode64(b).length
86+
elsif !(value.getS().nil?)
87+
s = value.getS();
88+
key_size += s.length;
89+
elsif !(value.getN().nil?)
90+
key_size += MAX_NUMBER_OF_BYTES_FOR_NUMBER;
91+
end
92+
}
93+
return key_size
94+
end
9595

96-
private
97-
def parse_view_type(hash)
98-
if @log_format == LogStash::Inputs::DynamoDB::LF_PLAIN
99-
return hash.to_json
100-
end
101-
case @view_type
102-
when LogStash::Inputs::DynamoDB::VT_KEYS_ONLY
103-
return parse_format(hash["dynamodb"]["keys"])
104-
when LogStash::Inputs::DynamoDB::VT_OLD_IMAGE
105-
return parse_format(hash["dynamodb"]["oldImage"])
106-
when LogStash::Inputs::DynamoDB::VT_NEW_IMAGE
107-
return parse_format(hash["dynamodb"]["newImage"]) #check new and old, dynamodb.
108-
end
109-
end
96+
private
97+
def parse_view_type(hash)
98+
if @log_format == LogStash::Inputs::DynamoDB::LF_PLAIN
99+
return hash.to_json
100+
end
101+
case @view_type
102+
when LogStash::Inputs::DynamoDB::VT_KEYS_ONLY
103+
return parse_format(hash["dynamodb"]["keys"])
104+
when LogStash::Inputs::DynamoDB::VT_OLD_IMAGE
105+
return parse_format(hash["dynamodb"]["oldImage"])
106+
when LogStash::Inputs::DynamoDB::VT_NEW_IMAGE
107+
return parse_format(hash["dynamodb"]["newImage"]) #check new and old, dynamodb.
108+
end
109+
end
110110

111-
private
112-
def parse_format(hash)
113-
if @log_format == LogStash::Inputs::DynamoDB::LF_DYNAMODB
114-
return hash.to_json
115-
else
116-
return dynamodb_to_json(hash)
117-
end
118-
end
111+
private
112+
def parse_format(hash)
113+
if @log_format == LogStash::Inputs::DynamoDB::LF_DYNAMODB
114+
return hash.to_json
115+
else
116+
return dynamodb_to_json(hash)
117+
end
118+
end
119119

120-
private
121-
def dynamodb_to_json(hash)
122-
return formatAttributeValueMap(hash).to_json
123-
end
120+
private
121+
def dynamodb_to_json(hash)
122+
return formatAttributeValueMap(hash).to_json
123+
end
124124

125-
private
126-
def formatAttributeValueMap(hash)
127-
keys_to_delete = []
128-
hash.each do |k, v|
129-
dynamodb_key = v.keys.first
130-
dynamodb_value = v.values.first
131-
if @log_format == LogStash::Inputs::DynamoDB::LF_JSON_NO_BIN and (dynamodb_key == "BS" or dynamodb_key == "B")
132-
keys_to_delete.push(k) # remove binary values and binary sets
133-
next
134-
end
135-
hash[k] = formatAttributeValue(v.keys.first, v.values.first)
136-
end
137-
keys_to_delete.each {|key| hash.delete(key)}
138-
return hash
139-
end
125+
private
126+
def formatAttributeValueMap(hash)
127+
keys_to_delete = []
128+
hash.each do |k, v|
129+
dynamodb_key = v.keys.first
130+
dynamodb_value = v.values.first
131+
if @log_format == LogStash::Inputs::DynamoDB::LF_JSON_NO_BIN and (dynamodb_key == "BS" or dynamodb_key == "B")
132+
keys_to_delete.push(k) # remove binary values and binary sets
133+
next
134+
end
135+
hash[k] = formatAttributeValue(v.keys.first, v.values.first)
136+
end
137+
keys_to_delete.each {|key| hash.delete(key)}
138+
return hash
139+
end
140+
141+
private
142+
def formatAttributeValue(key, value)
143+
case key
144+
when "M"
145+
formatAttributeValueMap(value)
146+
when "L"
147+
value.map! do |v|
148+
v = formatAttributeValue(v.keys.first, v.values.first)
149+
end
150+
when "NS","SS","BS"
151+
value.map! do |v|
152+
v = formatAttributeValue(key[0], v)
153+
end
154+
when "N"
155+
BigDecimal.new(value)
156+
when "NULL"
157+
nil
158+
else
159+
value
160+
end
161+
end
140162

141-
private
142-
def formatAttributeValue(key, value)
143-
case key
144-
when "M"
145-
formatAttributeValueMap(value)
146-
when "L"
147-
value.map! do |v|
148-
v = formatAttributeValue(v.keys.first, v.values.first)
149-
end
150-
when "NS","SS","BS"
151-
value.map! do |v|
152-
v = formatAttributeValue(key[0], v)
153163
end
154-
return value
155-
when "N"
156-
return BigDecimal.new(value)
157-
when "NULL"
158-
return nil
159-
else
160-
return value
161164
end
162165
end
163-
164166
end

lib/logstash/inputs/LogStashRecordProcessor.rb

+40-37
Original file line numberDiff line numberDiff line change
@@ -15,51 +15,54 @@
1515
#limitations under the License.
1616
#
1717
require "java"
18-
begin
19-
require 'jar-dependencies'
20-
require_jar( 'com.amazonaws', 'amazon-kinesis-client', '1.6.0' )
21-
require_jar( 'log4j', 'log4j', '1.2.17' )
22-
end
18+
19+
require "logstash-input-dynamodb_jars"
2320
java_import "com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason"
2421
java_import "java.lang.IllegalStateException"
2522
java_import "org.apache.log4j.LogManager"
2623

27-
class LogStashRecordProcessor
28-
include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor
24+
module Logstash
25+
module Inputs
26+
module DynamoDB
27+
class LogStashRecordProcessor
28+
include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor
2929

30-
attr_accessor :queue, :shard_id
30+
attr_accessor :queue, :shard_id
3131

32-
def initialize(queue)
33-
# Workaround for IRecordProcessor.initialize(String shardId) interfering with constructor.
34-
# No good way to overload methods in JRuby, so deciding which was supposed to be called here.
35-
if (queue.is_a? String)
36-
@shard_id = queue
37-
return
38-
else
39-
@queue ||= queue
40-
@logger ||= LogStash::Inputs::DynamoDB.logger
41-
end
42-
end
32+
def initialize(queue)
33+
# Workaround for IRecordProcessor.initialize(String shardId) interfering with constructor.
34+
# No good way to overload methods in JRuby, so deciding which was supposed to be called here.
35+
if (queue.is_a? String)
36+
@shard_id = queue
37+
return
38+
else
39+
@queue ||= queue
40+
@logger ||= LogStash::Inputs::DynamoDB.logger
41+
end
42+
end
4343

44-
def process_records(records, checkpointer)
45-
@logger.debug("Processing batch of " + records.size().to_s + " records")
46-
records.each do |record|
47-
@queue.push(record)
48-
end
49-
#checkpoint once all of the records have been consumed
50-
checkpointer.checkpoint()
51-
end
44+
def process_records(records, checkpointer)
45+
@logger.debug("Processing batch of " + records.size().to_s + " records")
46+
records.each do |record|
47+
@queue.push(record)
48+
end
49+
#checkpoint once all of the records have been consumed
50+
checkpointer.checkpoint()
51+
end
5252

53-
def shutdown(checkpointer, reason)
54-
case reason
55-
when ShutdownReason::TERMINATE
56-
checkpointer.checkpoint()
57-
when ShutdownReason::ZOMBIE
58-
else
59-
raise RuntimeError, "Invalid shutdown reason."
60-
end
61-
unless @shard_id.nil?
62-
@logger.info("shutting down record processor with shardId: " + @shard_id + " with reason " + reason.to_s)
53+
def shutdown(checkpointer, reason)
54+
case reason
55+
when ShutdownReason::TERMINATE
56+
checkpointer.checkpoint()
57+
when ShutdownReason::ZOMBIE
58+
else
59+
raise RuntimeError, "Invalid shutdown reason."
60+
end
61+
unless @shard_id.nil?
62+
@logger.info("shutting down record processor with shardId: " + @shard_id + " with reason " + reason.to_s)
63+
end
64+
end
65+
end
6366
end
6467
end
6568
end

0 commit comments

Comments
 (0)