From 885f7df6802a4b5b10286ecae6dc183ed0e0e6bf Mon Sep 17 00:00:00 2001 From: Peter Haag Date: Thu, 2 May 2024 14:08:31 +0200 Subject: [PATCH] Rework nflowcache. Replace hash code to speedup hash and have cleaner code for later multi-threading -s -A stats. --- src/nfdump/nflowcache.c | 1099 ++++++++++++++++++++++++--------------- 1 file changed, 684 insertions(+), 415 deletions(-) diff --git a/src/nfdump/nflowcache.c b/src/nfdump/nflowcache.c index bff6f966..e301344f 100755 --- a/src/nfdump/nflowcache.c +++ b/src/nfdump/nflowcache.c @@ -49,8 +49,6 @@ #include "blocksort.h" #include "config.h" #include "exporter.h" -#include "khash.h" -#include "klist.h" #include "maxmind/maxmind.h" #include "memhandle.h" #include "nfdump.h" @@ -74,7 +72,7 @@ static struct maskArray_s { uint64_t v6Mask[2]; } maskArray[MaxMaskArraySize] = {0}; // slot 0 empty -uint32_t maskIndex = 1; +static uint32_t maskIndex = 1; // For automatic output format generation in case of custom aggregation #define AggrPrependFmt "%ts %td " @@ -119,7 +117,7 @@ static struct aggregationElement_s { {"router", {EXipReceivedV4ID, OFFReceived4IP, SIZEReceived4IP, AF_INET}, 0, NOPREPROCESS, 0, 1, "%ra"}, {"router", {EXipReceivedV6ID, OFFReceived6IP, SIZEReceived6IP, AF_INET6}, 0, NOPREPROCESS, 0, 1, NULL}, {"insrcmac", {EXmacAddrID, OFFinSrcMac, SIZEinSrcMac, 0}, 0, NOPREPROCESS, 0, 0, "%ismc"}, - {"outdstmac", {EXmacAddrID, OFFoutDstMac, SIZEoutDstMac, 0}, 0, NOPREPROCESS, 0, 0, "%domc"}, + {"outdstmac", {EXmacAddrID, OFFoutDstMac, SIZEoutDstMac, 0}, 0, NOPREPROCESS, 0, 0, "%odmc"}, {"indstmac", {EXmacAddrID, OFFinDstMac, SIZEinDstMac, 0}, 0, NOPREPROCESS, 0, 0, "%idmc"}, {"outsrcmac", {EXmacAddrID, OFFoutSrcMac, SIZEoutSrcMac, 0}, 0, NOPREPROCESS, 0, 0, "%osmc"}, {"srcas", {EXasRoutingID, OFFsrcAS, SIZEsrcAS, 0}, 0, SRC_AS, 0, 0, "%sas"}, @@ -150,84 +148,114 @@ static struct aggregationElement_s { {"ethertype", {EXlayer2ID, OFFetherType, SIZEetherType, 0}, 0, NOPREPROCESS, 0, 0, "%eth"}, {NULL, {0, 0, 0}, 0, NOPREPROCESS, 0, 0, NULL}}; -/* Element of the flow hash ( cache ) */ +// FlowHash stat record, to aggregate flow counters in -A or -s stat/aggregate mode +// original flow record attached for later printing the record +// for -A -s hashkey points to the aggregation key in hash table +// for -O next points to next record in list typedef struct FlowHashRecord { - // record chain - for FlowList + recordHeaderV3_t *flowrecord; // orig flow record for printing union { - struct FlowHashRecord *next; - uint8_t *hashkey; + struct FlowHashRecord *next; // record chain for flow list + uint8_t *hashkey; // hash key for flow hash - check for swap }; - uint32_t hash; // the full 32bit hash value - cached for khash resize - uint16_t hashLen; // lenhth of hashKey + uint8_t inFlags; // tcp in flags uint8_t outFlags; // tcp out flags XXX unused currently + // 6 bytes padding unused // time info in msec - uint64_t msecFirst; - uint64_t msecLast; + uint64_t msecFirst; // overall first seen timestamp + uint64_t msecLast; // overall last seen timestamp - // flow counter parameters for FLOWS, INPACKETS, INBYTES, OUTPACKETS, OUTBYTES + // overall aggregated counters uint64_t inPackets; uint64_t inBytes; uint64_t outPackets; uint64_t outBytes; uint64_t flows; - recordHeaderV3_t *flowrecord; } FlowHashRecord_t; -// printing order definitions -typedef enum FlowDir { IN = 0, OUT, INOUT } flowDir_t; +// sorting record +// count - number to be sorted +typedef struct SortElement { + FlowHashRecord_t *flowRecord; + uint64_t count; +} SortElement_t; -typedef uint64_t (*order_proc_record_t)(FlowHashRecord_t *, flowDir_t); +// order functions prototype +// depending on the order mode -O, the appropriate function +// returns the value to be sorted - goes into SortElement record +typedef uint64_t (*order_proc_record_t)(FlowHashRecord_t *); // prototypes for order functions -static inline uint64_t null_record(FlowHashRecord_t *record, flowDir_t inout); -static inline uint64_t flows_record(FlowHashRecord_t *record, flowDir_t inout); -static inline uint64_t packets_record(FlowHashRecord_t *record, flowDir_t inout); -static inline uint64_t bytes_record(FlowHashRecord_t *record, flowDir_t inout); -static inline uint64_t pps_record(FlowHashRecord_t *record, flowDir_t inout); -static inline uint64_t bps_record(FlowHashRecord_t *record, flowDir_t inout); -static inline uint64_t bpp_record(FlowHashRecord_t *record, flowDir_t inout); -static inline uint64_t tstart_record(FlowHashRecord_t *record, flowDir_t inout); -static inline uint64_t tend_record(FlowHashRecord_t *record, flowDir_t inout); -static inline uint64_t duration_record(FlowHashRecord_t *record, flowDir_t inout); - -#define ASCENDING 1 -#define DESCENDING 0 +static inline uint64_t order_flows(FlowHashRecord_t *record); + +static inline uint64_t order_packets_in(FlowHashRecord_t *record); +static inline uint64_t order_packets_out(FlowHashRecord_t *record); +static inline uint64_t order_packets_inout(FlowHashRecord_t *record); + +static inline uint64_t order_bytes_in(FlowHashRecord_t *record); +static inline uint64_t order_bytes_out(FlowHashRecord_t *record); +static inline uint64_t order_bytes_inout(FlowHashRecord_t *record); + +static inline uint64_t order_pps_in(FlowHashRecord_t *record); +static inline uint64_t order_pps_out(FlowHashRecord_t *record); +static inline uint64_t order_pps_inout(FlowHashRecord_t *record); + +static inline uint64_t order_bps_in(FlowHashRecord_t *record); +static inline uint64_t order_bps_out(FlowHashRecord_t *record); +static inline uint64_t order_bps_inout(FlowHashRecord_t *record); + +static inline uint64_t order_bpp_in(FlowHashRecord_t *record); +static inline uint64_t order_bpp_out(FlowHashRecord_t *record); +static inline uint64_t order_bpp_inout(FlowHashRecord_t *record); + +static inline uint64_t order_tstart(FlowHashRecord_t *record); +static inline uint64_t order_tend(FlowHashRecord_t *record); +static inline uint64_t order_duration(FlowHashRecord_t *record); + +// printing order definitions +typedef enum FlowDir { IN = 0, OUT, INOUT } flowDir_t; +typedef enum PrintDir { DESCENDING = 0, ASCENDING } printDir_t; + static struct order_mode_s { - char *string; // Stat name - flowDir_t inout; // use IN or OUT or INOUT packets/bytes - int direction; // ascending or descending - order_proc_record_t record_function; // Function to call for record stats -} order_mode[] = {{"-", 0, 0, null_record}, // empty entry 0 - {"flows", IN, DESCENDING, flows_record}, - {"packets", INOUT, DESCENDING, packets_record}, - {"ipackets", IN, DESCENDING, packets_record}, - {"opackets", OUT, DESCENDING, packets_record}, - {"bytes", INOUT, DESCENDING, bytes_record}, - {"ibytes", IN, DESCENDING, bytes_record}, - {"obytes", OUT, DESCENDING, bytes_record}, - {"pps", INOUT, DESCENDING, pps_record}, - {"ipps", IN, DESCENDING, pps_record}, - {"opps", OUT, DESCENDING, pps_record}, - {"bps", INOUT, DESCENDING, bps_record}, - {"ibps", IN, DESCENDING, bps_record}, - {"obps", OUT, DESCENDING, bps_record}, - {"bpp", INOUT, DESCENDING, bpp_record}, - {"ibpp", IN, DESCENDING, bpp_record}, - {"obpp", OUT, DESCENDING, bpp_record}, - {"tstart", 0, ASCENDING, tstart_record}, - {"tend", 0, ASCENDING, tend_record}, - {"duration", 0, DESCENDING, duration_record}, - {NULL, 0, 0, NULL}}; + char *string; // Stat name + flowDir_t inout; // use IN or OUT or INOUT packets/bytes + printDir_t direction; // ascending or descending + order_proc_record_t record_function; // Function to call, returns sorting value +} order_mode[] = {{"-", 0, 0, NULL}, // empty entry 0 + {"flows", IN, DESCENDING, order_flows}, + {"packets", INOUT, DESCENDING, order_packets_inout}, + {"ipackets", IN, DESCENDING, order_packets_in}, + {"opackets", OUT, DESCENDING, order_packets_out}, + {"bytes", INOUT, DESCENDING, order_bytes_inout}, + {"ibytes", IN, DESCENDING, order_bytes_in}, + {"obytes", OUT, DESCENDING, order_bytes_out}, + {"pps", INOUT, DESCENDING, order_pps_inout}, + {"ipps", IN, DESCENDING, order_pps_in}, + {"opps", OUT, DESCENDING, order_pps_out}, + {"bps", INOUT, DESCENDING, order_bps_inout}, + {"ibps", IN, DESCENDING, order_bps_in}, + {"obps", OUT, DESCENDING, order_bps_out}, + {"bpp", INOUT, DESCENDING, order_bpp_inout}, + {"ibpp", IN, DESCENDING, order_bpp_in}, + {"obpp", OUT, DESCENDING, order_bpp_out}, + {"tstart", 0, ASCENDING, order_tstart}, + {"tend", 0, ASCENDING, order_tend}, + {"duration", 0, DESCENDING, order_duration}, + {NULL, 0, 0, NULL}}; // terminating entry + +#define MaxAggrStackSize 64 +static int aggregateInfo[MaxAggrStackSize] = {0}; static uint32_t FlowStat_order = 0; // bit field for multiple print orders static uint32_t PrintOrder = 0; // -O selected print order - index into order_mode -static uint32_t PrintDirection = 0; -static uint32_t GuessDirection = 0; -static uint32_t HasGeoDB = 0; +static uint32_t PrintDirection = 0; // ascending or descending +static uint32_t GuessDirection = 0; // try to guess flow direction for printing +static uint32_t HasGeoDB = 0; // GeoDB loaded +// predefined V6 hash key struct, used in -s record/.. typedef struct FlowKeyV6_s { uint16_t af; uint16_t srcPort; @@ -237,6 +265,7 @@ typedef struct FlowKeyV6_s { uint64_t dstAddr[2]; } FlowKeyV6_t; +// predefined V4 hash key struct, used in -s record/.. typedef struct FlowKeyV4_s { uint16_t af; uint16_t srcPort; @@ -246,43 +275,275 @@ typedef struct FlowKeyV4_s { uint32_t dstAddr; } FlowKeyV4_t; -typedef struct SortElement { - void *record; - uint64_t count; -} SortElement_t; +static inline void New_HashKey(void *keymem, recordHandle_t *recordHandle, int swap_flow); + +/* + * hash definition and implementation + * hash inspired by https://hackmd.io/@heyfey/SJZ-3jbs5 "Designing a Fast, Efficient, Cache-friendly Hash Table" + * implementation optimized for flow data, without SSE registers (ARM compatibility) + */ + +// hash function to generate 32bit hash value from var length input hash key +static inline uint32_t SuperFastHash(const char *data, int len) { + uint32_t hash = len; + + if (hash == 0 || data == NULL) return 0; + + int rem = len & 3; + len >>= 2; + + // Main loop + uint32_t tmp; + for (; len > 0; len--) { + hash += *((uint16_t *)data); + tmp = (*((uint16_t *)(data + 2)) << 11) ^ hash; + hash = (hash << 16) ^ tmp; + data += 2 * sizeof(uint16_t); + hash += hash >> 11; + } -// definitions for khash flow cache -typedef uint8_t *hashkey_t; // hash key - byte sequence + // Handle end cases + switch (rem) { + case 3: + hash += *((uint16_t *)data); + hash ^= hash << 16; + hash ^= data[sizeof(uint16_t)] << 18; + hash += hash >> 11; + break; + case 2: + hash += *((uint16_t *)data); + hash ^= hash << 11; + hash += hash >> 17; + break; + case 1: + hash += *data; + hash ^= hash << 10; + hash += hash >> 1; + } -static inline uint32_t SuperFastHash(const char *data, int len); + // Force "avalanching" of final 127 bits + hash ^= hash << 3; + hash += hash >> 5; + hash ^= hash << 4; + hash += hash >> 17; + hash ^= hash << 25; + hash += hash >> 6; + + return hash; +} // End of SuperFastHash + +// cell index calculation from 32bit hash, depending of hash bit size 'shift' +#define ___fib_hash(hash, shift) ((hash) * 2654435769U) >> (shift) + +// flag macros +#define is_free(flag, i) (flag[i] == 0) +#define is_used(flag, i) (flag[i] != 0) /* -// hash func - reduce byte sequence to kh_int -static kh_inline khint_t __HashFunc(const FlowHashRecord_t record) { - return SuperFastHash((char *)record.hashkey, hashKeyLen); -} -*/ -#define __HashFunc(k) (k).hash + * 3way hash: + * - flag array - uint8_t flag 0x1hhh hhhh - lower 7 bits of 32 hash + * - value array - hashValue_t with dynamic hash key for -s or -A aggregation, with index points to record array + * - record array - static stat record with flow record counters + * + * only keep hashValue (32bytes per entry) in hash table. + * for normal IPv4 aggragation, a key size of 16bytes fit directly into the hashValue. Aggregations up to 16byte + * hash values profit from fast CPU cache. + * For aggregation values > 16 bytes, a valPtr points to the nmalloc() value + */ + +// value type for hash +typedef struct hashValue_s { + union { + uint64_t val[2]; // 16 byte static hash value + void *valPtr; // value pointer if size > 16bytes + }; + uint32_t allign; // unused - 64bit alignment + uint32_t hash; // calculated 32bit hash + uint32_t ptrSize; // if > 0, valPtr points to value + uint32_t index; // index into record array for statistics values +} hashValue_t; + +// compare two hash values +// if size == 0 - directly compare the 16byte local value as two uint64_t +// if size > 16 - compare calculated hash and memcmp the two valPtr +#define valCompare(v1, v2) \ + ((v1).ptrSize == 0 ? ((v1).val[0] == (v2).val[0] && (v1).val[1] == (v2).val[1]) \ + : ((v1).hash == (v2).hash && (v1).ptrSize == (v2).ptrSize && memcmp((v1).valPtr, (v2).valPtr, (v1).ptrSize) == 0)) + +// hash definition +typedef struct flowHash_s { + uint8_t *flags; // 1 byte flag of hash: + // 0x0000 0000 - hash cell not in use + // 0x1... .... - hash cell in use + // 0x1hhh hhhh - lower 7 bits of hash + hashValue_t *cells; // hash cells + FlowHashRecord_t *records; // statistic records + uint32_t count; // number of used cells in hash + uint32_t capacity; // allocated cells + uint32_t mask; // mask for max index + uint32_t load_factor; // no more than load_factor until resize + int shift; // 32 - shift = bit width of hash +} flowHash_t; -// compare func - compare two hash keys -static kh_inline khint_t __HashEqual(FlowHashRecord_t r1, FlowHashRecord_t r2) { - return r1.hash == r2.hash && r1.hashLen == r2.hashLen && memcmp((void *)r1.hashkey, (void *)r2.hashkey, r2.hashLen) == 0; -} -// insert FlowHash definitions/code -KHASH_INIT(FlowHash, FlowHashRecord_t, char, 0, __HashFunc, __HashEqual) // FlowHash var -static khash_t(FlowHash) *FlowHash = NULL; -sig_atomic_t lock = 0; +static flowHash_t *flowHash = NULL; + +static flowHash_t *flowHash_init(void) { + flowHash_t *flowHash = calloc(1, sizeof(flowHash_t)); + if (!flowHash) return NULL; + + // start with capacity 512 + flowHash->shift = 23; + flowHash->capacity = 512; + flowHash->mask = flowHash->capacity - 1; + + flowHash->count = 0; + flowHash->load_factor = 256; + flowHash->flags = calloc(flowHash->capacity, sizeof(uint8_t)); + flowHash->cells = calloc(flowHash->capacity, sizeof(FlowHashRecord_t)); + flowHash->records = calloc(flowHash->capacity, sizeof(FlowHashRecord_t)); + return flowHash->cells != NULL && flowHash->flags != NULL ? flowHash : NULL; +} // End of flowHash_init + +static void flowHash_free(void) { + if (!flowHash) return; + + free(flowHash->flags); + free(flowHash->cells); + free(flowHash->records); + free(flowHash); + flowHash = NULL; + +} // End of flowHash_free + +/* + * resize hash: + * resize flags and cell array and rearrange entries + * records remain in same place, but memory gets resized + */ +static inline void flowHash_resize(flowHash_t *flowHash) { + int oldCapacity = flowHash->load_factor = flowHash->capacity; + flowHash->capacity = 1u << (32 - (--flowHash->shift)); + flowHash->mask = flowHash->capacity - 1; + + hashValue_t *oldCells = flowHash->cells; + hashValue_t *newCells = calloc(flowHash->capacity, sizeof(hashValue_t)); + + uint8_t *oldFlags = flowHash->flags; + uint8_t *newFlags = calloc(flowHash->capacity, sizeof(uint8_t)); + + FlowHashRecord_t *newRecords = realloc(flowHash->records, flowHash->capacity * sizeof(FlowHashRecord_t)); + assert(newFlags && newCells && newRecords); + + // rearrange cells and flags, according to hash and new bit width of hash table + for (uint32_t i = 0; i < oldCapacity; i++) { + if (is_used(oldFlags, i)) { + uint32_t cell = ___fib_hash(oldCells[i].hash, flowHash->shift); + while (is_used(newFlags, cell)) { + cell = (cell + 1) & flowHash->mask; + } + newCells[cell] = oldCells[i]; + newFlags[cell] = oldFlags[i]; + } + } + flowHash->cells = newCells; + flowHash->flags = newFlags; + flowHash->records = newRecords; + free(oldCells); + free(oldFlags); +} // End of flowHash_resize + +/* + * Adds new value to the hash table. + * insert is set to + * 0 - value exists already. + * 1 - value was inserted. + * returns the index into the stat record array of new or existing value + */ +static inline int flowHash_add(flowHash_t *flowHash, const hashValue_t *value, int *insert) { + uint32_t hash, cell; + + if (flowHash->count == flowHash->load_factor) flowHash_resize(flowHash); + + hash = value->hash; + // cell address + cell = ___fib_hash(hash, flowHash->shift); + + uint8_t flag = 0x80 | (hash & 0x7F); + // shortcut for likely unused cell - speed up + if (is_free(flowHash->flags, cell)) { + int index = flowHash->count++; + flowHash->flags[cell] = flag; + flowHash->cells[cell] = *value; + flowHash->cells[cell].index = index; + *insert = 1; + return index; + } + + // loop until existing value or empty cell is found + do { + // find empty cell or cell with correct flags + while (is_used(flowHash->flags, cell) && (flowHash->flags[cell] != flag)) + if (++cell == flowHash->capacity) cell = 0; + + if (is_free(flowHash->flags, cell)) { + // free cell found + int index = flowHash->count++; + flowHash->flags[cell] = flag; + flowHash->cells[cell] = *value; + flowHash->cells[cell].index = index; + *insert = 1; + return index; + } else { + // cell with matching flag + if (valCompare(flowHash->cells[cell], *value)) { + // existing value found + *insert = 0; + return flowHash->cells[cell].index; + } + } + // hash collision - cell used by another value + if (++cell == flowHash->capacity) cell = 0; + } while (1); + +} // End of flowHash_add + +/* + * Searches for an existing value in the hash table. + * returns: + * index into the stat record array if found + * -1 if value does not exists + */ +static inline int flowHash_get(flowHash_t *flowHash, hashValue_t *value) { + uint32_t hash = value->hash; + // cell address + uint32_t cell = ___fib_hash(hash, flowHash->shift); + + // shortcut to speed up if cell is empty + if (is_free(flowHash->flags, cell)) return -1; + + uint8_t flag = 0x80 | (hash & 0x7F); + // cell used, check for correct value + do { + // search for matching flag + while (is_used(flowHash->flags, cell) && (flowHash->flags[cell] != flag)) + if (++cell == flowHash->capacity) cell = 0; + + if (is_free(flowHash->flags, cell)) return -1; + if (valCompare(flowHash->cells[cell], *value)) return flowHash->cells[cell].index; + + // collision - flag matches but compare does not - loop + if (++cell == flowHash->capacity) cell = 0; + } while (1); +} -// linear FlowList +// linear FlowList for -O sorting static struct FlowList_s { FlowHashRecord_t *head; FlowHashRecord_t **tail; size_t NumRecords; } FlowList = {0}; -#define MaxAggrStackSize 64 -static int aggregateInfo[MaxAggrStackSize] = {0}; static void *keymemV4 = NULL; static void *keymemV6 = NULL; static size_t keymenV4Len = 0; @@ -295,22 +556,11 @@ static uint32_t bidir_flows = 0; #include "nfdump_inline.c" #include "nffile_inline.c" -#undef get16bits -#if (defined(__GNUC__) && defined(__i386__)) || defined(__WATCOMC__) || defined(_MSC_VER) || defined(__BORLANDC__) || defined(__TURBOC__) -#define get16bits(d) (*((const uint16_t *)(d))) -#endif - -#if !defined(get16bits) -#define get16bits(d) ((((uint32_t)(((const uint8_t *)(d))[1])) << 8) + (uint32_t)(((const uint8_t *)(d))[0])) -#endif - #define NeedSwapGeneric(GuessDir, r) \ (GuessDir && ((r)->proto == IPPROTO_TCP || (r)->proto == IPPROTO_UDP) && \ ((((r)->srcPort < 1024) && ((r)->dstPort >= 1024)) || (((r)->srcPort < 32768) && ((r)->dstPort >= 32768)) || \ (((r)->srcPort < 49152) && ((r)->dstPort >= 49152)))) -static inline void *New_HashKey(void *keymem, recordHandle_t *recordHandle, int swap_flow); - static inline int NeedSwap(int GuessDir, void *genericFlowKey); static SortElement_t *GetSortList(size_t *size); @@ -345,6 +595,148 @@ static inline int NeedSwap(int GuessDir, void *genericFlowKey) { } } // End of NeedSwap +static uint64_t order_flows(FlowHashRecord_t *record) { return record->flows; } + +static uint64_t order_packets_in(FlowHashRecord_t *record) { + if (NeedSwap(GuessDirection, record->hashkey)) { + return record->outPackets; + } else { + return record->inPackets; + } +} // End of order_packets_in + +static uint64_t order_packets_out(FlowHashRecord_t *record) { + if (NeedSwap(GuessDirection, record->hashkey)) { + return record->inPackets; + } else { + return record->outPackets; + } +} // End of order_packets_out + +static uint64_t order_packets_inout(FlowHashRecord_t *record) { + // + return record->inPackets + record->outPackets; +} // End of order_packets_inout + +static uint64_t order_bytes_in(FlowHashRecord_t *record) { + if (NeedSwap(GuessDirection, record->hashkey)) { + return record->outBytes; + } else { + return record->inBytes; + } +} // End of order_bytes_in + +static uint64_t order_bytes_out(FlowHashRecord_t *record) { + if (NeedSwap(GuessDirection, record->hashkey)) { + return record->inBytes; + } else { + return record->outBytes; + } +} // End of order_bytes_out + +static uint64_t order_bytes_inout(FlowHashRecord_t *record) { + // + return record->inBytes + record->outBytes; +} // End of order_bytes_inout + +static uint64_t order_pps_in(FlowHashRecord_t *record) { + /* duration in msec */ + if (record->msecLast == 0) return 0; + + uint64_t duration = record->msecLast - record->msecFirst; + uint64_t packets = record->inPackets; + return (1000LL * packets) / duration; + +} // End of order_pps_in + +static uint64_t order_pps_out(FlowHashRecord_t *record) { + /* duration in msec */ + if (record->msecLast == 0) return 0; + + uint64_t duration = record->msecLast - record->msecFirst; + uint64_t packets = record->outPackets; + return (1000LL * packets) / duration; + +} // End of order_pps_out + +static uint64_t order_pps_inout(FlowHashRecord_t *record) { + /* duration in msec */ + if (record->msecLast == 0) return 0; + + uint64_t duration = record->msecLast - record->msecFirst; + uint64_t packets = record->inPackets + record->outPackets; + return (1000LL * packets) / duration; + +} // End of order_pps_inout + +static uint64_t order_bps_in(FlowHashRecord_t *record) { + /* duration in msec */ + if (record->msecLast == 0) return 0; + + uint64_t duration = record->msecLast - record->msecFirst; + uint64_t bytes = record->inBytes; + return (8000LL * bytes) / duration; /* 8 bits per Octet - x 1000 for msec */ + +} // End of order_bps_in + +static uint64_t order_bps_out(FlowHashRecord_t *record) { + /* duration in msec */ + if (record->msecLast == 0) return 0; + + uint64_t duration = record->msecLast - record->msecFirst; + uint64_t bytes = record->outBytes; + return (8000LL * bytes) / duration; /* 8 bits per Octet - x 1000 for msec */ + +} // End of order_bps_out + +static uint64_t order_bps_inout(FlowHashRecord_t *record) { + /* duration in msec */ + if (record->msecLast == 0) return 0; + + uint64_t duration = record->msecLast - record->msecFirst; + uint64_t bytes = record->inBytes + record->outBytes; + return (8000LL * bytes) / duration; /* 8 bits per Octet - x 1000 for msec */ + +} // End of order_bps_inout + +static uint64_t order_bpp_in(FlowHashRecord_t *record) { + uint64_t packets = record->inPackets; + if (packets == 0) return 0; + + uint64_t bytes = record->inBytes; + return bytes / packets; +} // End of order_bpp_in + +static uint64_t order_bpp_out(FlowHashRecord_t *record) { + uint64_t packets = record->outPackets; + if (packets == 0) return 0; + + uint64_t bytes = record->outBytes; + return bytes / packets; +} // End of order_bpp_out + +static uint64_t order_bpp_inout(FlowHashRecord_t *record) { + uint64_t packets = record->inPackets + record->outPackets; + if (packets == 0) return 0; + + uint64_t bytes = record->outBytes + record->outBytes; + return bytes / packets; +} // End of order_bpp_inout + +static uint64_t order_tstart(FlowHashRecord_t *record) { + // + return record->msecFirst; +} // End of order_tstart + +static uint64_t order_tend(FlowHashRecord_t *record) { + // + return record->msecLast; +} // End of order_tend + +static uint64_t order_duration(FlowHashRecord_t *record) { + return record->msecLast ? (record->msecLast - record->msecFirst) : 0; +} // End of order_duration + static inline void PreProcess(void *inPtr, preprocess_t process, recordHandle_t *recordHandle) { EXipv4Flow_t *ipv4Flow = (EXipv4Flow_t *)recordHandle->extensionList[EXipv4FlowID]; EXipv6Flow_t *ipv6Flow = (EXipv6Flow_t *)recordHandle->extensionList[EXipv6FlowID]; @@ -382,55 +774,10 @@ static inline void PreProcess(void *inPtr, preprocess_t process, recordHandle_t } } // End of PreProcess -static inline uint32_t SuperFastHash(const char *data, int len) { - uint32_t hash = len; - - if (len <= 0 || data == NULL) return 0; - - int rem = len & 3; - len >>= 2; - - /* Main loop */ - uint32_t tmp; - for (; len > 0; len--) { - hash += get16bits(data); - tmp = (get16bits(data + 2) << 11) ^ hash; - hash = (hash << 16) ^ tmp; - data += 2 * sizeof(uint16_t); - hash += hash >> 11; - } - - /* Handle end cases */ - switch (rem) { - case 3: - hash += get16bits(data); - hash ^= hash << 16; - hash ^= data[sizeof(uint16_t)] << 18; - hash += hash >> 11; - break; - case 2: - hash += get16bits(data); - hash ^= hash << 11; - hash += hash >> 17; - break; - case 1: - hash += *data; - hash ^= hash << 10; - hash += hash >> 1; - } - - /* Force "avalanching" of final 127 bits */ - hash ^= hash << 3; - hash += hash >> 5; - hash ^= hash << 4; - hash += hash >> 17; - hash ^= hash << 25; - hash += hash >> 6; - - return hash; -} - -static inline void *New_HashKey(void *keymem, recordHandle_t *recordHandle, int swap_flow) { +/* + * generate dynamic hash value for hast table, depending on -s or -A parameters + */ +static inline void New_HashKey(void *keymem, recordHandle_t *recordHandle, int swap_flow) { EXipv4Flow_t *ipv4Flow = (EXipv4Flow_t *)recordHandle->extensionList[EXipv4FlowID]; EXipv6Flow_t *ipv6Flow = (EXipv6Flow_t *)recordHandle->extensionList[EXipv6FlowID]; EXgenericFlow_t *genericFlow = (EXgenericFlow_t *)recordHandle->extensionList[EXgenericFlowID]; @@ -534,80 +881,8 @@ static inline void *New_HashKey(void *keymem, recordHandle_t *recordHandle, int } } - return keymem; - } // End of New_HashKey -static uint64_t null_record(FlowHashRecord_t *record, flowDir_t inout) { return 0; } - -static uint64_t flows_record(FlowHashRecord_t *record, flowDir_t inout) { return record->flows; } - -static uint64_t packets_record(FlowHashRecord_t *record, flowDir_t inout) { - if (NeedSwap(GuessDirection, record->hashkey)) { - if (inout == IN) - inout = OUT; - else if (inout == OUT) - inout = IN; - } - if (inout == IN) - return record->inPackets; - else if (inout == OUT) - return record->outPackets; - else - return record->inPackets + record->outPackets; -} - -static uint64_t bytes_record(FlowHashRecord_t *record, flowDir_t inout) { - if (NeedSwap(GuessDirection, record->hashkey)) { - if (inout == IN) - inout = OUT; - else if (inout == OUT) - inout = IN; - } - if (inout == IN) - return record->inBytes; - else if (inout == OUT) - return record->outBytes; - else - return record->inBytes + record->outBytes; -} - -static uint64_t pps_record(FlowHashRecord_t *record, flowDir_t inout) { - /* duration in msec */ - uint64_t duration = record->msecLast ? record->msecLast - record->msecFirst : 0; - if (duration == 0) - return 0; - else { - uint64_t packets = packets_record(record, inout); - return (1000LL * packets) / duration; - } -} // End of pps_record - -static uint64_t bps_record(FlowHashRecord_t *record, flowDir_t inout) { - uint64_t duration = record->msecLast ? record->msecLast - record->msecFirst : 0; - if (duration == 0) - return 0; - else { - uint64_t bytes = bytes_record(record, inout); - return (8000LL * bytes) / duration; /* 8 bits per Octet - x 1000 for msec */ - } -} // End of bps_record - -static uint64_t bpp_record(FlowHashRecord_t *record, flowDir_t inout) { - uint64_t packets = packets_record(record, inout); - uint64_t bytes = bytes_record(record, inout); - - return packets ? bytes / packets : 0; -} // End of bpp_record - -static uint64_t tstart_record(FlowHashRecord_t *record, flowDir_t inout) { return record->msecFirst; } // End of tstart_record - -static uint64_t tend_record(FlowHashRecord_t *record, flowDir_t inout) { return record->msecLast; } // End of tend_record - -static uint64_t duration_record(FlowHashRecord_t *record, flowDir_t inout) { - return record->msecLast ? (record->msecLast - record->msecFirst) : 0; -} // End of duration_record - static void ApplyAggregateMask(recordHandle_t *recordHandle, struct aggregationElement_s *aggregationElement) { EXipv4Flow_t *ipv4Flow = (EXipv4Flow_t *)recordHandle->extensionList[EXipv4FlowID]; EXipv6Flow_t *ipv6Flow = (EXipv6Flow_t *)recordHandle->extensionList[EXipv6FlowID]; @@ -681,58 +956,10 @@ static void ApplyNetMaskBits(recordHandle_t *recordHandle, struct aggregationEle } // End of ApplyNetMaskBits -// return a linear list of aggregated/listed flows for later sorting -static SortElement_t *GetSortList(size_t *size) { - dbg_printf("Enter %s\n", __func__); - SortElement_t *list; - - size_t hashSize = kh_size(FlowHash); - if (hashSize) { // aggregated flows in khash - list = (SortElement_t *)calloc(hashSize, sizeof(SortElement_t)); - if (!list) { - LogError("calloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno)); - *size = 0; - return NULL; - } - - int c = 0; - for (khiter_t k = kh_begin(FlowHash); k != kh_end(FlowHash); ++k) { // traverse - if (kh_exist(FlowHash, k)) { - FlowHashRecord_t *r = &kh_key(FlowHash, k); - list[c++].record = (void *)r; - } - } - *size = hashSize; - - } else { // linear flow list - size_t listSize = FlowList.NumRecords; - if (!listSize) { - *size = 0; - return NULL; - } - list = (SortElement_t *)calloc(listSize, sizeof(SortElement_t)); - if (!list) { - LogError("calloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno)); - *size = 0; - return NULL; - } - - FlowHashRecord_t *r = FlowList.head; - for (int i = 0; i < listSize; i++) { - list[i].record = (void *)r; - r = r->next; - } - *size = listSize; - } - - return list; - -} // End of GetSortList - int Init_FlowCache(int hasGeoDB) { if (!nfalloc_Init(0)) return 0; - FlowHash = kh_init(FlowHash); + flowHash = flowHash_init(); FlowList = (struct FlowList_s){.head = NULL, .tail = &FlowList.head, .NumRecords = 0}; keymenV4Len = sizeof(FlowKeyV4_t); keymenV6Len = sizeof(FlowKeyV6_t); @@ -743,7 +970,10 @@ int Init_FlowCache(int hasGeoDB) { } // End of Init_FlowCache -void Dispose_FlowTable(void) { nfalloc_free(); } // End of Dispose_FlowTable +void Dispose_FlowTable(void) { + flowHash_free(); + nfalloc_free(); +} // End of Dispose_FlowTable // Parse flow cache print order -O int Parse_PrintOrder(char *order) { @@ -774,7 +1004,7 @@ int Parse_PrintOrder(char *order) { return -1; } - PrintDirection = direction >= 0 ? direction : order_mode[PrintOrder].direction; + PrintDirection = direction >= 0 ? direction : order_mode[PrintOrder].direction; // A return PrintOrder; @@ -809,16 +1039,16 @@ int SetRecordStat(char *statType, char *optOrder) { *r++ = 0; switch (*r) { case 'a': - PrintDirection = ASCENDING; + PrintDirection = ASCENDING; // A break; case 'd': - PrintDirection = DESCENDING; + PrintDirection = DESCENDING; // A break; default: return -1; } } else { - PrintDirection = DESCENDING; + PrintDirection = DESCENDING; // A } int i = 0; @@ -1067,8 +1297,6 @@ static void AddBidirFlow(recordHandle_t *recordHandle) { aggrFlows = cntFlow->flows; } - static void *bidirkeymem = NULL; - size_t keyLen = 0; void **keymem = NULL; if (ipv4Flow) { @@ -1080,47 +1308,51 @@ static void AddBidirFlow(recordHandle_t *recordHandle) { } else return; - if (*keymem == NULL) *keymem = nfmalloc(keyLen); + hashValue_t hashValue = {0}; + if (keyLen > 16) { + if (*keymem == NULL) *keymem = nfmalloc(keyLen); + hashValue.valPtr = *keymem; + hashValue.ptrSize = keyLen; + } else { + *keymem = &hashValue.val; + } + // generate hash value from selected -s -A parameters New_HashKey(*keymem, recordHandle, 0); - uint32_t forwardHash = SuperFastHash(*keymem, keyLen); - - FlowHashRecord_t r; - r.hashkey = *keymem; - r.hashLen = keyLen; - r.hash = forwardHash; - - int ret; - khiter_t k = kh_get(FlowHash, FlowHash, r); - if (k != kh_end(FlowHash)) { - // flow record found - best case! update all fields - kh_key(FlowHash, k).inBytes += inBytes; - kh_key(FlowHash, k).inPackets += inPackets; - kh_key(FlowHash, k).outBytes += outBytes; - kh_key(FlowHash, k).outPackets += outPackets; - kh_key(FlowHash, k).inFlags |= genericFlow->tcpFlags; - - if (genericFlow->msecFirst < kh_key(FlowHash, k).msecFirst) { - kh_key(FlowHash, k).msecFirst = genericFlow->msecFirst; + // generate 32bit hash from hash value + hashValue.hash = SuperFastHash(*keymem, keyLen); + + int index = flowHash_get(flowHash, &hashValue); + if (index >= 0) { + // flow record found - update all fields + flowHash->records[index].inBytes += inBytes; + flowHash->records[index].inPackets += inPackets; + flowHash->records[index].outBytes += outBytes; + flowHash->records[index].outPackets += outPackets; + flowHash->records[index].inFlags |= genericFlow->tcpFlags; + + if (genericFlow->msecFirst < flowHash->records[index].msecFirst) { + flowHash->records[index].msecFirst = genericFlow->msecFirst; } - if (genericFlow->msecLast > kh_key(FlowHash, k).msecLast) { - kh_key(FlowHash, k).msecLast = genericFlow->msecLast; + if (genericFlow->msecLast > flowHash->records[index].msecLast) { + flowHash->records[index].msecLast = genericFlow->msecLast; } - kh_key(FlowHash, k).flows += aggrFlows; + flowHash->records[index].flows += aggrFlows; } else if (genericFlow->proto != IPPROTO_TCP && genericFlow->proto != IPPROTO_UDP) { // no flow record found and no TCP/UDP bidir flows. Insert flow record into hash - k = kh_put(FlowHash, FlowHash, r, &ret); - kh_key(FlowHash, k).inBytes = inBytes; - kh_key(FlowHash, k).inPackets = inPackets; - kh_key(FlowHash, k).outBytes = outBytes; - kh_key(FlowHash, k).outPackets = outPackets; - kh_key(FlowHash, k).flows = aggrFlows; - kh_key(FlowHash, k).inFlags = genericFlow->tcpFlags; - kh_key(FlowHash, k).outFlags = 0; - - kh_key(FlowHash, k).msecFirst = genericFlow->msecFirst; - kh_key(FlowHash, k).msecLast = genericFlow->msecLast; + int insert; + index = flowHash_add(flowHash, &hashValue, &insert); + flowHash->records[index].inBytes = inBytes; + flowHash->records[index].inPackets = inPackets; + flowHash->records[index].outBytes = outBytes; + flowHash->records[index].outPackets = outPackets; + flowHash->records[index].flows = aggrFlows; + flowHash->records[index].inFlags = genericFlow->tcpFlags; + flowHash->records[index].outFlags = 0; + + flowHash->records[index].msecFirst = genericFlow->msecFirst; + flowHash->records[index].msecLast = genericFlow->msecLast; void *p = malloc(record->size); if (!p) { @@ -1128,7 +1360,8 @@ static void AddBidirFlow(recordHandle_t *recordHandle) { exit(255); } memcpy((void *)p, record, record->size); - kh_key(FlowHash, k).flowrecord = p; + flowHash->records[index].flowrecord = p; + flowHash->records[index].hashkey = *keymem; // keymen got part of the cache *keymem = NULL; @@ -1136,51 +1369,44 @@ static void AddBidirFlow(recordHandle_t *recordHandle) { // for bidir flows do // generate reverse hash key to search for bidir flow - // we need it only to lookup - if (bidirkeymem == NULL) { - bidirkeymem = nfmalloc(keymenV6Len); - } - - // generate the hash key for reverse record (bidir) - New_HashKey(bidirkeymem, recordHandle, 1); - r.hashkey = bidirkeymem; - r.hashLen = keyLen; - r.hash = SuperFastHash(bidirkeymem, keyLen); - - k = kh_get(FlowHash, FlowHash, r); - if (k != kh_end(FlowHash)) { - // we found a corresponding flow - so update all fields in reverse direction - kh_key(FlowHash, k).outBytes += inBytes; - kh_key(FlowHash, k).outPackets += inPackets; - kh_key(FlowHash, k).inBytes += outBytes; - kh_key(FlowHash, k).inPackets += outPackets; - kh_key(FlowHash, k).outFlags |= genericFlow->tcpFlags; - - if (genericFlow->msecFirst < kh_key(FlowHash, k).msecFirst) { - kh_key(FlowHash, k).msecFirst = genericFlow->msecFirst; + New_HashKey(*keymem, recordHandle, 1); + hashValue.hash = SuperFastHash(*keymem, keyLen); + + index = flowHash_get(flowHash, &hashValue); + if (index >= 0) { + // we found a corresponding reverse flow - so update all fields in reverse direction + flowHash->records[index].outBytes += inBytes; + flowHash->records[index].outPackets += inPackets; + flowHash->records[index].inBytes += outBytes; + flowHash->records[index].inPackets += outPackets; + flowHash->records[index].outFlags |= genericFlow->tcpFlags; + + if (genericFlow->msecFirst < flowHash->records[index].msecFirst) { + flowHash->records[index].msecFirst = genericFlow->msecFirst; } - if (genericFlow->msecLast > kh_key(FlowHash, k).msecLast) { - kh_key(FlowHash, k).msecLast = genericFlow->msecLast; + if (genericFlow->msecLast > flowHash->records[index].msecLast) { + flowHash->records[index].msecLast = genericFlow->msecLast; } - kh_key(FlowHash, k).flows += aggrFlows; + flowHash->records[index].flows += aggrFlows; } else { // no bidir flow found // insert original flow into the cache - r.hashkey = *keymem; - r.hash = forwardHash; - r.hashLen = keyLen; - k = kh_put(FlowHash, FlowHash, r, &ret); - kh_key(FlowHash, k).inBytes = inBytes; - kh_key(FlowHash, k).inPackets = inPackets; - kh_key(FlowHash, k).outBytes = outBytes; - kh_key(FlowHash, k).outPackets = outPackets; - kh_key(FlowHash, k).flows = aggrFlows; - kh_key(FlowHash, k).inFlags = genericFlow->tcpFlags; - kh_key(FlowHash, k).outFlags = 0; - - kh_key(FlowHash, k).msecFirst = genericFlow->msecFirst; - kh_key(FlowHash, k).msecLast = genericFlow->msecLast; + New_HashKey(*keymem, recordHandle, 0); + hashValue.hash = SuperFastHash(*keymem, keyLen); + + int insert; + index = flowHash_add(flowHash, &hashValue, &insert); + flowHash->records[index].inBytes = inBytes; + flowHash->records[index].inPackets = inPackets; + flowHash->records[index].outBytes = outBytes; + flowHash->records[index].outPackets = outPackets; + flowHash->records[index].flows = aggrFlows; + flowHash->records[index].inFlags = genericFlow->tcpFlags; + flowHash->records[index].outFlags = 0; + + flowHash->records[index].msecFirst = genericFlow->msecFirst; + flowHash->records[index].msecLast = genericFlow->msecLast; void *p = malloc(record->size); if (!p) { @@ -1188,7 +1414,8 @@ static void AddBidirFlow(recordHandle_t *recordHandle) { exit(255); } memcpy((void *)p, record, record->size); - kh_key(FlowHash, k).flowrecord = p; + flowHash->records[index].flowrecord = p; + flowHash->records[index].hashkey = *keymem; // keymen got part of the cache *keymem = NULL; @@ -1240,61 +1467,103 @@ void AddFlowCache(recordHandle_t *recordHandle) { keyLen = keymenV4Len; } - if (*keymem == NULL) *keymem = nfmalloc(keyLen); + hashValue_t hashValue = {0}; + if (keyLen > 16) { + if (*keymem == NULL) *keymem = nfmalloc(keyLen); + hashValue.valPtr = *keymem; + hashValue.ptrSize = keyLen; + } else { + *keymem = &hashValue.val; + } -#ifdef DEVEL - void *endOfKey = New_HashKey(*keymem, recordHandle, 0); - printf("Key diff: %zu, len: %zu\n", endOfKey - *keymem, keyLen); -#else New_HashKey(*keymem, recordHandle, 0); -#endif - - FlowHashRecord_t r; - r.hashkey = *keymem; - r.hashLen = keyLen; - r.hash = SuperFastHash(*keymem, keyLen); - - int ret; - khiter_t k = kh_put(FlowHash, FlowHash, r, &ret); - if (ret == 0) { - // flow record found - best case! update all fields - kh_key(FlowHash, k).inBytes += inBytes; - kh_key(FlowHash, k).inPackets += inPackets; - kh_key(FlowHash, k).outBytes += outBytes; - kh_key(FlowHash, k).outPackets += outPackets; - kh_key(FlowHash, k).inFlags |= genericFlow->tcpFlags; - - if (genericFlow->msecFirst < kh_key(FlowHash, k).msecFirst) { - kh_key(FlowHash, k).msecFirst = genericFlow->msecFirst; + hashValue.hash = SuperFastHash(*keymem, keyLen); + + int insert; + int index = flowHash_add(flowHash, &hashValue, &insert); + if (insert == 0) { + // flow record found - update all fields + flowHash->records[index].inBytes += inBytes; + flowHash->records[index].inPackets += inPackets; + flowHash->records[index].outBytes += outBytes; + flowHash->records[index].outPackets += outPackets; + flowHash->records[index].inFlags |= genericFlow->tcpFlags; + + if (genericFlow->msecFirst < flowHash->records[index].msecFirst) { + flowHash->records[index].msecFirst = genericFlow->msecFirst; } - if (genericFlow->msecLast > kh_key(FlowHash, k).msecLast) { - kh_key(FlowHash, k).msecLast = genericFlow->msecLast; + if (genericFlow->msecLast > flowHash->records[index].msecLast) { + flowHash->records[index].msecLast = genericFlow->msecLast; } - kh_key(FlowHash, k).flows += aggrFlows; + flowHash->records[index].flows += aggrFlows; } else { // no flow record found and no TCP/UDP bidir flows. Insert flow record into hash - kh_key(FlowHash, k).inBytes = inBytes; - kh_key(FlowHash, k).inPackets = inPackets; - kh_key(FlowHash, k).outBytes = outBytes; - kh_key(FlowHash, k).outPackets = outPackets; - kh_key(FlowHash, k).flows = aggrFlows; - kh_key(FlowHash, k).inFlags = genericFlow->tcpFlags; - kh_key(FlowHash, k).outFlags = 0; + flowHash->records[index].inBytes = inBytes; + flowHash->records[index].inPackets = inPackets; + flowHash->records[index].outBytes = outBytes; + flowHash->records[index].outPackets = outPackets; + flowHash->records[index].flows = aggrFlows; + flowHash->records[index].inFlags = genericFlow->tcpFlags; + flowHash->records[index].outFlags = 0; - kh_key(FlowHash, k).msecFirst = genericFlow->msecFirst; - kh_key(FlowHash, k).msecLast = genericFlow->msecLast; + flowHash->records[index].msecFirst = genericFlow->msecFirst; + flowHash->records[index].msecLast = genericFlow->msecLast; void *p = nfmalloc(record->size); memcpy((void *)p, record, record->size); - kh_key(FlowHash, k).flowrecord = p; - + flowHash->records[index].flowrecord = p; + flowHash->records[index].hashkey = *keymem; // keymen got part of the cache *keymem = NULL; } } // End of AddFlowCache +// return a linear list of aggregated/listed flows for later sorting +static SortElement_t *GetSortList(size_t *size) { + dbg_printf("Enter %s\n", __func__); + + SortElement_t *list = NULL; + *size = 0; + + uint32_t hashSize = flowHash->count; + + if (hashSize) { // hash table + list = (SortElement_t *)calloc(hashSize, sizeof(SortElement_t)); + if (!list) { + LogError("calloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno)); + return NULL; + } + + for (uint32_t i = 0; i < flowHash->count; i++) { + list[i].flowRecord = &(flowHash->records[i]); + } + *size = hashSize; + + } else if (FlowList.NumRecords) { // linear flow list + size_t listSize = FlowList.NumRecords; + if (!listSize) { + return NULL; + } + list = (SortElement_t *)calloc(listSize, sizeof(SortElement_t)); + if (!list) { + LogError("calloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno)); + return NULL; + } + + FlowHashRecord_t *flowRecord = FlowList.head; + for (int i = 0; i < listSize; i++) { + list[i].flowRecord = flowRecord; + flowRecord = flowRecord->next; + } + *size = listSize; + } + + return list; + +} // End of GetSortList + // print SortList - apply possible aggregation mask to zero out aggregated fields static inline void PrintSortList(SortElement_t *SortList, uint32_t maxindex, outputParams_t *outputParams, int GuessFlowDirection, RecordPrinter_t print_record, int ascending) { @@ -1304,8 +1573,8 @@ static inline void PrintSortList(SortElement_t *SortList, uint32_t maxindex, out for (int i = 0; i < max; i++) { int j = ascending ? i : maxindex - 1 - i; - FlowHashRecord_t *r = (FlowHashRecord_t *)(SortList[j].record); - recordHeaderV3_t *v3record = (r->flowrecord); + FlowHashRecord_t *flowRecord = SortList[j].flowRecord; + recordHeaderV3_t *v3record = (flowRecord->flowrecord); recordHandle_t recordHandle = {0}; MapRecordHandle(&recordHandle, v3record, i + 1); @@ -1315,19 +1584,19 @@ static inline void PrintSortList(SortElement_t *SortList, uint32_t maxindex, out EXasRouting_t *asRouting = (EXasRouting_t *)recordHandle.extensionList[EXasRoutingID]; EXcntFlow_t *cntFlow = (EXcntFlow_t *)recordHandle.extensionList[EXcntFlowID]; - genericFlow->inPackets = r->inPackets; - genericFlow->inBytes = r->inBytes; - genericFlow->msecFirst = r->msecFirst; - genericFlow->msecLast = r->msecLast; - genericFlow->tcpFlags = r->inFlags; + genericFlow->inPackets = flowRecord->inPackets; + genericFlow->inBytes = flowRecord->inBytes; + genericFlow->msecFirst = flowRecord->msecFirst; + genericFlow->msecLast = flowRecord->msecLast; + genericFlow->tcpFlags = flowRecord->inFlags; EXcntFlow_t tmpCntFlow = {0}; - if (cntFlow == NULL && (r->flows > 1 || r->outPackets)) { + if (cntFlow == NULL && (flowRecord->flows > 1 || flowRecord->outPackets)) { recordHandle.extensionList[EXcntFlowID] = &tmpCntFlow; cntFlow = &tmpCntFlow; - cntFlow->outPackets = r->outPackets; - cntFlow->outBytes = r->outBytes; - cntFlow->flows = r->flows; + cntFlow->outPackets = flowRecord->outPackets; + cntFlow->outBytes = flowRecord->outBytes; + cntFlow->flows = flowRecord->flows; } if (NeedSwapGeneric(GuessFlowDirection, genericFlow)) { @@ -1350,13 +1619,13 @@ static inline void ExportSortList(SortElement_t *SortList, uint32_t maxindex, nf for (int i = 0; i < maxindex; i++) { int j = ascending ? i : maxindex - 1 - i; - FlowHashRecord_t *r = (FlowHashRecord_t *)(SortList[j].record); + FlowHashRecord_t *flowRecord = SortList[j].flowRecord; - recordHeaderV3_t *recordHeaderV3 = (r->flowrecord); + recordHeaderV3_t *recordHeaderV3 = (flowRecord->flowrecord); // check, if we need cntFlow extension int exCntSize = 0; - if (r->outPackets || r->outBytes || r->flows > 1) { + if (flowRecord->outPackets || flowRecord->outBytes || flowRecord->flows > 1) { exCntSize = EXcntFlowSize; } @@ -1387,16 +1656,16 @@ static inline void ExportSortList(SortElement_t *SortList, uint32_t maxindex, nf EXgenericFlow_t *genericFlow = (EXgenericFlow_t *)recordHandle.extensionList[EXgenericFlowID]; if (genericFlow) { - genericFlow->inPackets = r->inPackets; - genericFlow->inBytes = r->inBytes; - genericFlow->msecFirst = r->msecFirst; - genericFlow->msecLast = r->msecLast; - genericFlow->tcpFlags = r->inFlags; + genericFlow->inPackets = flowRecord->inPackets; + genericFlow->inBytes = flowRecord->inBytes; + genericFlow->msecFirst = flowRecord->msecFirst; + genericFlow->msecLast = flowRecord->msecLast; + genericFlow->tcpFlags = flowRecord->inFlags; } if (cntFlow) { - cntFlow->outPackets = r->outPackets; - cntFlow->outBytes = r->outBytes; - cntFlow->flows = r->flows; + cntFlow->outPackets = flowRecord->outPackets; + cntFlow->outBytes = flowRecord->outBytes; + cntFlow->flows = flowRecord->flows; } if (NeedSwapGeneric(GuessFlowDirection, genericFlow)) { @@ -1444,11 +1713,11 @@ void PrintFlowStat(RecordPrinter_t print_record, outputParams_t *outputParams) { unsigned int order_bit = 1 << order_index; if (FlowStat_order & order_bit) { for (int i = 0; i < maxindex; i++) { - FlowHashRecord_t *r = (FlowHashRecord_t *)(SortList[i].record); + FlowHashRecord_t *r = SortList[i].flowRecord; /* if we have some different sort orders, which are not directly available in the FlowHashRecord_t * we need to calculate this value first - such as bpp, bps etc. */ - SortList[i].count = order_mode[order_index].record_function(r, order_mode[order_index].inout); + SortList[i].count = order_mode[order_index].record_function(r); } if (maxindex > 2) { @@ -1467,7 +1736,7 @@ void PrintFlowStat(RecordPrinter_t print_record, outputParams_t *outputParams) { } } PrintProlog(outputParams); - PrintSortList(SortList, maxindex, outputParams, 0, print_record, PrintDirection); + PrintSortList(SortList, maxindex, outputParams, 0, print_record, PrintDirection); // A } } @@ -1485,8 +1754,8 @@ void PrintFlowTable(RecordPrinter_t print_record, outputParams_t *outputParams, if (PrintOrder) { // for any -O print mode for (int i = 0; i < maxindex; i++) { - FlowHashRecord_t *r = (FlowHashRecord_t *)(SortList[i].record); - SortList[i].count = order_mode[PrintOrder].record_function(r, order_mode[PrintOrder].inout); + FlowHashRecord_t *r = SortList[i].flowRecord; + SortList[i].count = order_mode[PrintOrder].record_function(r); } if (maxindex >= 2) { @@ -1497,10 +1766,10 @@ void PrintFlowTable(RecordPrinter_t print_record, outputParams_t *outputParams, } } - PrintSortList(SortList, maxindex, outputParams, GuessDir, print_record, PrintDirection); + PrintSortList(SortList, maxindex, outputParams, GuessDir, print_record, PrintDirection); // A } else { // for -a and no -O sorting required - PrintSortList(SortList, maxindex, outputParams, GuessDir, print_record, PrintDirection); + PrintSortList(SortList, maxindex, outputParams, GuessDir, print_record, PrintDirection); // A } } // End of PrintFlowTable @@ -1515,8 +1784,8 @@ int ExportFlowTable(nffile_t *nffile, int aggregate, int bidir, int GuessDir) { if (PrintOrder) { // for any -O print mode for (int i = 0; i < maxindex; i++) { - FlowHashRecord_t *r = (FlowHashRecord_t *)(SortList[i].record); - SortList[i].count = order_mode[PrintOrder].record_function(r, order_mode[PrintOrder].inout); + FlowHashRecord_t *r = SortList[i].flowRecord; + SortList[i].count = order_mode[PrintOrder].record_function(r); } if (maxindex >= 2) { @@ -1527,7 +1796,7 @@ int ExportFlowTable(nffile_t *nffile, int aggregate, int bidir, int GuessDir) { } } } - ExportSortList(SortList, maxindex, nffile, GuessDir, PrintDirection); + ExportSortList(SortList, maxindex, nffile, GuessDir, PrintDirection); // A return 1;