Skip to content

Commit 6d345b5

Browse files
AbazigalSylvain Rodon
authored andcommitted
[IPFIX] Fix parsing when using buffered (TCP) input
When using a TCP input, packets' data are buffered before logstash tries do decode them. Therefore, our decode() function will receive chunks of "random" sizes, that might contain 2 PDUs, 3.4 PDUs, etc. The current code parses only one PDU and discards the rest of the payload. Therefore, we can easily miss a PDU, and the next call will most likely parse the middle of a PDU, which will result in an error. The file ipfix.dat used during CI is actually a good example : it contains 3 IPFIX messages. But so far, the code is only considering the first one, hence the 7 flows returned instead of the 13 that the file contains. This commit makes sure each call consumes all the PDUs available in the payload, and the remaining data (beginning of another PDU) are buffered to be reused in the next call.
1 parent e7d1119 commit 6d345b5

File tree

3 files changed

+237
-4
lines changed

3 files changed

+237
-4
lines changed

lib/logstash/codecs/netflow.rb

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class LogStash::Codecs::Netflow < LogStash::Codecs::Base
5252
def initialize(params = {})
5353
super(params)
5454
@threadsafe = true
55+
@payload_buffer = ""
5556
end
5657

5758
def clone
@@ -75,6 +76,12 @@ def register
7576

7677
def decode(payload, metadata = nil, &block)
7778
# BinData::trace_reading do
79+
unless @payload_buffer.empty?
80+
# Reuse previously bufferized payload
81+
payload = @payload_buffer + payload
82+
@payload_buffer = ""
83+
end
84+
7885
header = Header.read(payload)
7986

8087
unless @versions.include?(header.version)
@@ -100,9 +107,24 @@ def decode(payload, metadata = nil, &block)
100107
end
101108
elsif header.version == 10
102109
# BinData::trace_reading do
103-
flowset = IpfixPDU.read(payload)
104-
flowset.records.each do |record|
105-
decode_ipfix(flowset, record).each { |event| yield(event) }
110+
while payload.bytesize > 4
111+
flowset = IpfixShortPDU.read(payload)
112+
if flowset.pdu_length > payload.bytesize
113+
# Incomplete PDU => bufferize & wait for next call
114+
@payload_buffer = payload
115+
payload = ""
116+
else
117+
flowset = IpfixPDU.read(payload)
118+
flowset.records.each do |record|
119+
decode_ipfix(flowset, record).each { |event| yield(event) }
120+
end
121+
# Remove processed PDU from payload
122+
payload = payload.byteslice(flowset.pdu_length..-1)
123+
end
124+
end
125+
unless payload.empty?
126+
# Not enough bytes to read PDU length => bufferize & wait for next call
127+
@payload_buffer = payload
106128
end
107129
# end
108130
else

lib/logstash/codecs/netflow/util.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,12 @@ class IpfixOptionFlowset < BinData::Record
464464
end
465465
end
466466
467+
class IpfixShortPDU < BinData::Record
468+
endian :big
469+
uint16 :version
470+
uint16 :pdu_length
471+
end
472+
467473
class IpfixPDU < BinData::Record
468474
endian :big
469475
uint16 :version

spec/codecs/netflow_spec.rb

Lines changed: 206 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -662,10 +662,167 @@
662662
}
663663
END
664664

665+
events << <<-END
666+
{
667+
"@timestamp" : "2015-05-13T11:20:26.000Z",
668+
"netflow" : {
669+
"destinationIPv4Address" : "10.4.36.64",
670+
"destinationTransportPort" : 9200,
671+
"egressInterface" : 0,
672+
"flowEndSysUpTime" : 1356,
673+
"flowStartSysUpTime" : 1356,
674+
"icmpTypeCodeIPv4" : 0,
675+
"ingressInterface" : 0,
676+
"ipClassOfService" : 0,
677+
"ipVersion" : 4,
678+
"octetDeltaCount" : 60,
679+
"packetDeltaCount" : 1,
680+
"protocolIdentifier" : 6,
681+
"sourceIPv4Address" : "192.168.253.130",
682+
"sourceTransportPort" : 38254,
683+
"tcpControlBits" : 2,
684+
"version" : 10,
685+
"vlanId" : 0
686+
},
687+
"@version" : "1"
688+
}
689+
END
690+
691+
events << <<-END
692+
{
693+
"@timestamp" : "2015-05-13T11:20:28.000Z",
694+
"netflow" : {
695+
"destinationIPv4Address" : "192.168.253.128",
696+
"destinationTransportPort" : 22,
697+
"egressInterface" : 0,
698+
"flowEndSysUpTime" : 14611,
699+
"flowStartSysUpTime" : 12727,
700+
"icmpTypeCodeIPv4" : 0,
701+
"ingressInterface" : 0,
702+
"ipClassOfService" : 0,
703+
"ipVersion" : 4,
704+
"octetDeltaCount" : 256,
705+
"packetDeltaCount" : 4,
706+
"protocolIdentifier" : 6,
707+
"sourceIPv4Address" : "192.168.253.1",
708+
"sourceTransportPort" : 60560,
709+
"tcpControlBits" : 24,
710+
"version" : 10,
711+
"vlanId" : 0
712+
},
713+
"@version" : "1"
714+
}
715+
END
716+
717+
events << <<-END
718+
{
719+
"@timestamp" : "2015-05-13T11:20:28.000Z",
720+
"netflow" : {
721+
"destinationIPv4Address" : "192.168.253.1",
722+
"destinationTransportPort" : 60560,
723+
"egressInterface" : 0,
724+
"flowEndSysUpTime" : 14611,
725+
"flowStartSysUpTime" : 12727,
726+
"icmpTypeCodeIPv4" : 0,
727+
"ingressInterface" : 0,
728+
"ipClassOfService" : 0,
729+
"ipVersion" : 4,
730+
"octetDeltaCount" : 1916,
731+
"packetDeltaCount" : 3,
732+
"protocolIdentifier" : 6,
733+
"sourceIPv4Address" : "192.168.253.128",
734+
"sourceTransportPort" : 22,
735+
"tcpControlBits" : 24,
736+
"version" : 10,
737+
"vlanId" : 0
738+
},
739+
"@version" : "1"
740+
}
741+
END
742+
743+
events << <<-END
744+
{
745+
"@timestamp" : "2015-05-13T11:20:28.000Z",
746+
"netflow" : {
747+
"destinationIPv4Address" : "192.168.253.128",
748+
"destinationTransportPort" : 22,
749+
"egressInterface" : 0,
750+
"flowEndSysUpTime" : 12726,
751+
"flowStartSysUpTime" : 12725,
752+
"icmpTypeCodeIPv4" : 0,
753+
"ingressInterface" : 0,
754+
"ipClassOfService" : 0,
755+
"ipVersion" : 4,
756+
"octetDeltaCount" : 168,
757+
"packetDeltaCount" : 2,
758+
"protocolIdentifier" : 6,
759+
"sourceIPv4Address" : "192.168.253.1",
760+
"sourceTransportPort" : 65308,
761+
"tcpControlBits" : 24,
762+
"version" : 10,
763+
"vlanId" : 0
764+
},
765+
"@version" : "1"
766+
}
767+
END
768+
769+
770+
events << <<-END
771+
{
772+
"@timestamp" : "2015-05-13T11:20:28.000Z",
773+
"netflow" : {
774+
"destinationIPv4Address" : "192.168.253.1",
775+
"destinationTransportPort" : 65308,
776+
"egressInterface" : 0,
777+
"flowEndSysUpTime" : 12726,
778+
"flowStartSysUpTime" : 12725,
779+
"icmpTypeCodeIPv4" : 0,
780+
"ingressInterface" : 0,
781+
"ipClassOfService" : 0,
782+
"ipVersion" : 4,
783+
"octetDeltaCount" : 84,
784+
"packetDeltaCount" : 1,
785+
"protocolIdentifier" : 6,
786+
"sourceIPv4Address" : "192.168.253.128",
787+
"sourceTransportPort" : 22,
788+
"tcpControlBits" : 24,
789+
"version" : 10,
790+
"vlanId" : 0
791+
},
792+
"@version" : "1"
793+
}
794+
END
795+
796+
events << <<-END
797+
{
798+
"@timestamp" : "2015-05-13T11:20:28.000Z",
799+
"netflow" : {
800+
"destinationIPv4Address" : "224.0.0.251",
801+
"destinationTransportPort" : 5353,
802+
"egressInterface" : 0,
803+
"flowEndSysUpTime" : 12741,
804+
"flowStartSysUpTime" : 12741,
805+
"icmpTypeCodeIPv4" : 0,
806+
"ingressInterface" : 0,
807+
"ipClassOfService" : 0,
808+
"ipVersion" : 4,
809+
"octetDeltaCount" : 232,
810+
"packetDeltaCount" : 1,
811+
"protocolIdentifier" : 17,
812+
"sourceIPv4Address" : "192.168.253.1",
813+
"sourceTransportPort" : 5353,
814+
"tcpControlBits" : 0,
815+
"version" : 10,
816+
"vlanId" : 0
817+
},
818+
"@version" : "1"
819+
}
820+
END
821+
665822
end
666823

667824
it "should decode raw data" do
668-
expect(decode.size).to eq(7)
825+
expect(decode.size).to eq(13)
669826

670827
expect(decode[0].get("[netflow][version]")).to eq(10)
671828
expect(decode[0].get("[netflow][systemInitTimeMilliseconds]")).to eq(1431516013506)
@@ -711,6 +868,48 @@
711868
expect(decode[6].get("[netflow][destinationTransportPort]")).to eq(443)
712869
expect(decode[6].get("[netflow][protocolIdentifier]")).to eq(6)
713870
expect(decode[6].get("[netflow][tcpControlBits]")).to eq(26)
871+
872+
expect(decode[7].get("[netflow][sourceIPv4Address]")).to eq("192.168.253.130")
873+
expect(decode[7].get("[netflow][destinationIPv4Address]")).to eq("10.4.36.64")
874+
expect(decode[7].get("[netflow][sourceTransportPort]")).to eq(38254)
875+
expect(decode[7].get("[netflow][destinationTransportPort]")).to eq(9200)
876+
expect(decode[7].get("[netflow][protocolIdentifier]")).to eq(6)
877+
expect(decode[7].get("[netflow][tcpControlBits]")).to eq(2)
878+
879+
expect(decode[8].get("[netflow][sourceIPv4Address]")).to eq("192.168.253.1")
880+
expect(decode[8].get("[netflow][destinationIPv4Address]")).to eq("192.168.253.128")
881+
expect(decode[8].get("[netflow][sourceTransportPort]")).to eq(60560)
882+
expect(decode[8].get("[netflow][destinationTransportPort]")).to eq(22)
883+
expect(decode[8].get("[netflow][protocolIdentifier]")).to eq(6)
884+
expect(decode[8].get("[netflow][tcpControlBits]")).to eq(24)
885+
886+
expect(decode[9].get("[netflow][sourceIPv4Address]")).to eq("192.168.253.128")
887+
expect(decode[9].get("[netflow][destinationIPv4Address]")).to eq("192.168.253.1")
888+
expect(decode[9].get("[netflow][sourceTransportPort]")).to eq(22)
889+
expect(decode[9].get("[netflow][destinationTransportPort]")).to eq(60560)
890+
expect(decode[9].get("[netflow][protocolIdentifier]")).to eq(6)
891+
expect(decode[9].get("[netflow][tcpControlBits]")).to eq(24)
892+
893+
expect(decode[10].get("[netflow][sourceIPv4Address]")).to eq("192.168.253.1")
894+
expect(decode[10].get("[netflow][destinationIPv4Address]")).to eq("192.168.253.128")
895+
expect(decode[10].get("[netflow][sourceTransportPort]")).to eq(65308)
896+
expect(decode[10].get("[netflow][destinationTransportPort]")).to eq(22)
897+
expect(decode[10].get("[netflow][protocolIdentifier]")).to eq(6)
898+
expect(decode[10].get("[netflow][tcpControlBits]")).to eq(24)
899+
900+
expect(decode[11].get("[netflow][sourceIPv4Address]")).to eq("192.168.253.128")
901+
expect(decode[11].get("[netflow][destinationIPv4Address]")).to eq("192.168.253.1")
902+
expect(decode[11].get("[netflow][sourceTransportPort]")).to eq(22)
903+
expect(decode[11].get("[netflow][destinationTransportPort]")).to eq(65308)
904+
expect(decode[11].get("[netflow][protocolIdentifier]")).to eq(6)
905+
expect(decode[11].get("[netflow][tcpControlBits]")).to eq(24)
906+
907+
expect(decode[12].get("[netflow][sourceIPv4Address]")).to eq("192.168.253.1")
908+
expect(decode[12].get("[netflow][destinationIPv4Address]")).to eq("224.0.0.251")
909+
expect(decode[12].get("[netflow][sourceTransportPort]")).to eq(5353)
910+
expect(decode[12].get("[netflow][destinationTransportPort]")).to eq(5353)
911+
expect(decode[12].get("[netflow][protocolIdentifier]")).to eq(17)
912+
expect(decode[12].get("[netflow][tcpControlBits]")).to eq(0)
714913
end
715914

716915
it "should serialize to json" do
@@ -721,6 +920,12 @@
721920
expect(JSON.parse(decode[4].to_json)).to eq(JSON.parse(json_events[4]))
722921
expect(JSON.parse(decode[5].to_json)).to eq(JSON.parse(json_events[5]))
723922
expect(JSON.parse(decode[6].to_json)).to eq(JSON.parse(json_events[6]))
923+
expect(JSON.parse(decode[7].to_json)).to eq(JSON.parse(json_events[7]))
924+
expect(JSON.parse(decode[8].to_json)).to eq(JSON.parse(json_events[8]))
925+
expect(JSON.parse(decode[9].to_json)).to eq(JSON.parse(json_events[9]))
926+
expect(JSON.parse(decode[10].to_json)).to eq(JSON.parse(json_events[10]))
927+
expect(JSON.parse(decode[11].to_json)).to eq(JSON.parse(json_events[11]))
928+
expect(JSON.parse(decode[12].to_json)).to eq(JSON.parse(json_events[12]))
724929
end
725930

726931
end

0 commit comments

Comments
 (0)