diff --git a/man/nfpcapd.1 b/man/nfpcapd.1 index 844e3e43..8316c890 100755 --- a/man/nfpcapd.1 +++ b/man/nfpcapd.1 @@ -91,6 +91,14 @@ Specify name of pidfile. Default is no pidfile. Daemon mode: fork to background and detach from terminal. Nfpcapd terminates on signal TERM, INT and HUP. .TP 3 +.B -d +De-duplicate packets with a window size of 4. This option is useful, if +packets in a pcap file are duplicated for some unknown reason. If packets +on multiple span ports are sent to the collector they got de-duplicated within +the window size. Note: The de-duplication works on the IP layer to the end +of the packet, which means layer 1 and VLAN/MPLS layers are stripped. +If used together with -p, the resulting pcaps are de-duplicated as well. +.TP 3 .B -E Verbose flow printing. Print flows on stdout, when flushed to disk. Use verbose printing only for debugging purpose in order to see if your diff --git a/src/nfpcapd/Makefile.am b/src/nfpcapd/Makefile.am index 0b75d1a6..1b8e1768 100755 --- a/src/nfpcapd/Makefile.am +++ b/src/nfpcapd/Makefile.am @@ -28,4 +28,6 @@ if HAVEPCAPAPPEND AM_CPPFLAGS += -DHAVEPCAPAPPEND endif +EXTRA_DIST = murmurhash.c + CLEANFILES = *.gch diff --git a/src/nfpcapd/nfpcapd.c b/src/nfpcapd/nfpcapd.c index 477798a6..0ee7c467 100755 --- a/src/nfpcapd/nfpcapd.c +++ b/src/nfpcapd/nfpcapd.c @@ -133,6 +133,7 @@ static void usage(char *name) { "-r pcapfile\tread packets from file\n" "-b num\tset socket buffer size in MB. (default 20MB)\n" "-B num\tset the node cache size. (default 524288)\n" + "-d\t\tDe-duplicate packets with window size 8.\n" "-s snaplen\tset the snapshot length - default 1522\n" "-e active,inactive\tset the active,inactive flow expire time (s) - default 300,60\n" "-o options \tAdd flow options, separated with ','. Available: 'fat', 'payload'\n" @@ -280,7 +281,7 @@ static int scanOptions(flowParam_t *flowParam, char *options) { int main(int argc, char *argv[]) { sigset_t signal_set; struct sigaction sa; - int c, snaplen, bufflen, err, do_daemonize; + int c, snaplen, bufflen, err, do_daemonize, doDedup; int subdir_index, compress, expire, cache_size, buff_size; int activeTimeout, inactiveTimeout, metricInterval, workers; dirstat_t *dirstat; @@ -293,6 +294,7 @@ int main(int argc, char *argv[]) { snaplen = 1522; bufflen = 0; do_daemonize = 0; + doDedup = 0; launcher_pid = 0; device = NULL; pcapfile = NULL; @@ -319,7 +321,7 @@ int main(int argc, char *argv[]) { inactiveTimeout = 0; workers = 0; - while ((c = getopt(argc, argv, "b:B:C:De:g:hH:I:i:j:l:m:o:p:P:r:s:S:T:t:u:vVw:yz::")) != EOF) { + while ((c = getopt(argc, argv, "b:B:C:dDe:g:hH:I:i:j:l:m:o:p:P:r:s:S:T:t:u:vVw:yz::")) != EOF) { switch (c) { struct stat fstat; case 'h': @@ -341,6 +343,9 @@ int main(int argc, char *argv[]) { configFile = optarg; } break; + case 'd': + doDedup = 1; + break; case 'D': do_daemonize = 1; break; @@ -570,6 +575,7 @@ int main(int argc, char *argv[]) { flowParam_t flowParam = {0}; flushParam.extensionFormat = time_extension; flowParam.extensionFormat = time_extension; + packetParam.doDedup = doDedup; if (options && scanOptions(&flowParam, options) < 0) { exit(EXIT_FAILURE); @@ -778,8 +784,8 @@ int main(int argc, char *argv[]) { CloseMetric(); - LogInfo("Total: Processed: %u, skipped: %u, short caplen: %u, unknown: %u\n", packetParam.proc_stat.packets, packetParam.proc_stat.skipped, - packetParam.proc_stat.short_snap, packetParam.proc_stat.unknown); + LogInfo("Total: Processed: %u, skipped: %u, short caplen: %u, unknown: %u, duplicates: %llu\n", packetParam.proc_stat.packets, + packetParam.proc_stat.skipped, packetParam.proc_stat.short_snap, packetParam.proc_stat.unknown, packetParam.proc_stat.duplicates); if (pidfile) remove_pid(pidfile); diff --git a/src/nfpcapd/packet_pcap.h b/src/nfpcapd/packet_pcap.h index fa10dc34..6558c3cf 100755 --- a/src/nfpcapd/packet_pcap.h +++ b/src/nfpcapd/packet_pcap.h @@ -65,6 +65,7 @@ typedef struct proc_stat_s { uint32_t skipped; uint32_t unknown; uint32_t short_snap; + uint64_t duplicates; } proc_stat_t; #ifdef USE_TPACKETV3 @@ -96,6 +97,7 @@ typedef struct packetParam_s { pcap_t *pcap_dev; int t_win; int *done; + int doDedup; uint32_t snaplen; uint32_t linktype; diff --git a/src/nfpcapd/pcaproc.c b/src/nfpcapd/pcaproc.c index 8fde014a..68254e76 100644 --- a/src/nfpcapd/pcaproc.c +++ b/src/nfpcapd/pcaproc.c @@ -109,6 +109,8 @@ static inline void ProcessICMPFlow(packetParam_t *packetParam, struct FlowNode * static inline void ProcessOtherFlow(packetParam_t *packetParam, struct FlowNode *NewNode, void *payload, size_t payloadSize); +#include "murmurhash.c" + pcapfile_t *OpenNewPcapFile(pcap_t *p, char *filename, pcapfile_t *pcapfile) { if (!pcapfile) { // Create struct @@ -499,7 +501,7 @@ static inline void ProcessOtherFlow(packetParam_t *packetParam, struct FlowNode } // End of ProcessOtherFlow -void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, const u_char *data) { +int ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, const u_char *data) { struct FlowNode *Node = NULL; uint16_t version, IPproto; char s1[64]; @@ -526,6 +528,7 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co // link layer processing uint16_t protocol = 0; uint32_t linktype = packetParam->linktype; + int redoLink = 0; REDO_LINK: switch (linktype) { case DLT_EN10MB: @@ -535,7 +538,7 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co int IEEE802 = protocol <= 1500; if (IEEE802) { packetParam->proc_stat.skipped++; - return; + return 1; } // unwrap link layer dataptr += 14; @@ -582,7 +585,7 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co break; default: LogInfo("Packet: %u: unsupported DLT_NULL protocol: 0x%x, packet: %u", pkg_cnt, header); - return; + return 1; } } break; case DLT_LINUX_SLL: @@ -601,12 +604,12 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co nflog_hdr_t *nflog_hdr = (nflog_hdr_t *)dataptr; if (hdr->caplen < sizeof(nflog_hdr_t)) { LogInfo("Packet: %u: NFLOG: not enough data", pkg_cnt); - return; + return 1; } if (nflog_hdr->nflog_version != 0) { LogInfo("Packet: %u: unsupported NFLOG version: %d", pkg_cnt, nflog_hdr->nflog_version); - return; + return 1; } dbg_printf("Linktype: DLT_NFLOG\n"); dbg_printf("NFLOG: %s, rid: %u\n", nflog_hdr->nflog_family == 2 ? "IPv4" : "IPv6", ntohs(nflog_hdr->nflog_rid)); @@ -620,7 +623,7 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co if (size % 4 != 0) size += 4 - size % 4; if (size < sizeof(nflog_tlv_t)) { LogInfo("Packet: %u: NFLOG: tlv size error: %u", pkg_cnt, size); - return; + return 1; } if (tlv->tlv_type == NFULA_PAYLOAD) { @@ -636,7 +639,7 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co pflog_hdr_t *pfloghdr = (pflog_hdr_t *)dataptr; if (hdr->caplen < PFLOG_HDRLEN) { LogInfo("Packet: %u: PFLOG: not enough data", pkg_cnt); - return; + return 1; } pflog = malloc(sizeof(pflog_hdr_t)); memcpy(pflog, pfloghdr, sizeof(pflog_hdr_t)); @@ -647,20 +650,20 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co } break; default: LogInfo("Packet: %u: unsupported link type: 0x%x, packet: %u", pkg_cnt, linktype); - return; + return 1; } REDO_LINK_PROTO: if (dataptr >= eodata) { packetParam->proc_stat.short_snap++; dbg_printf("Short packet: %u, Check line: %u", hdr->caplen, __LINE__); - return; + return 1; } dbg_printf("Next protocol: 0x%x\n", protocol); int IEEE802 = protocol <= 1500; if (IEEE802) { packetParam->proc_stat.skipped++; - return; + return 1; } switch (protocol) { case ETHERTYPE_IP: // IPv4 @@ -790,6 +793,23 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co goto END_FUNC; } + // IPv6 duplicate check + // dublicate check starts from the IP header over the rest of the packet + // vlan, mpls and layer 1 headers are ignored + if (unlikely(packetParam->doDedup && redoLink == 0)) { + // check for de-dup + uint32_t hopLimit = ip6->ip6_ctlun.ip6_un1.ip6_un1_hlim; + ip6->ip6_ctlun.ip6_un1.ip6_un1_hlim = 0; + uint16_t len = ntohs(ip6->ip6_ctlun.ip6_un1.ip6_un1_plen); + if (is_duplicate((const uint8_t *)ip, len + 40)) { + packetParam->proc_stat.duplicates++; + return 0; + } + ip6->ip6_ctlun.ip6_un1.ip6_un1_hlim = hopLimit; + // prevent recursive dedub checks with IP in IP packets + redoLink++; + } + // ipv6 Extension headers not processed IPproto = ip6->ip6_ctlun.ip6_un1.ip6_un1_nxt; dbg_printf("Packet IPv6, SRC %s, DST %s\n", inet_ntop(AF_INET6, &ip6->ip6_src, s1, sizeof(s1)), @@ -826,6 +846,25 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co goto END_FUNC; } + // IPv4 duplicate check + // dublicate check starts from the IP header over the rest of the packet + // vlan, mpls and layer 1 headers are ignored + if (unlikely(packetParam->doDedup && redoLink == 0)) { + // check for de-dup + uint32_t ttl = ip->ip_ttl; + uint32_t sum = ip->ip_sum; + ip->ip_ttl = 0; + ip->ip_sum = 0; + if (is_duplicate((const uint8_t *)ip, ntohs(ip->ip_len))) { + packetParam->proc_stat.duplicates++; + return 0; + } + ip->ip_ttl = ttl; + ip->ip_sum = sum; + // prevent recursive dedub checks with IP in IP packets + redoLink++; + } + IPproto = ip->ip_p; dbg_printf("Packet IPv4 SRC %s, DST %s\n", inet_ntop(AF_INET, &ip->ip_src, s1, sizeof(s1)), inet_ntop(AF_INET, &ip->ip_dst, s2, sizeof(s2))); @@ -1137,4 +1176,5 @@ void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, co lastRun = hdr->ts.tv_sec; } + return 1; } // End of ProcessPacket diff --git a/src/nfpcapd/pcaproc.h b/src/nfpcapd/pcaproc.h index 113e5611..cb9f58b8 100755 --- a/src/nfpcapd/pcaproc.h +++ b/src/nfpcapd/pcaproc.h @@ -115,6 +115,6 @@ void RotateFile(pcapfile_t *pcapfile, time_t t_CloseRename, int live); void ProcessFlowNode(FlowSource_t *fs, struct FlowNode *node); -void ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, const u_char *data); +int ProcessPacket(packetParam_t *packetParam, const struct pcap_pkthdr *hdr, const u_char *data); #endif // _PCAPROC_H