From 284da0ce04e2e752d15e612393fe53d1aef5aa16 Mon Sep 17 00:00:00 2001 From: Peter Haag Date: Fri, 31 Jan 2025 12:29:53 +0100 Subject: [PATCH] Allow user selected output format with custom aggregation. See #597 --- src/nfdump/nflowcache.c | 122 +++++++++++++++++++++++++++++++--------- 1 file changed, 96 insertions(+), 26 deletions(-) diff --git a/src/nfdump/nflowcache.c b/src/nfdump/nflowcache.c index a79544f9..f48260d7 100755 --- a/src/nfdump/nflowcache.c +++ b/src/nfdump/nflowcache.c @@ -76,11 +76,15 @@ static struct maskArray_s { static uint32_t maskIndex = 1; // For automatic output format generation in case of custom aggregation +// prepends start and duration time #define AggrPrependFmt "%ts %td " #define AggrPrependCvs "%ts,%td" +// appends counters #define AggrAppendFmt "%pkt %byt %bps %bpp %fl" #define AggrAppendCvs "%pkt,%byt,%bps,%bpp,%fl" +typedef enum { AUTOFMT = 0, AUTOCSV, USERFMT } aggrPrintMode_t; + static struct aggregationElement_s { char *aggrElement; // name of aggregation parameter aggregate_param_t param; // the parameter array @@ -1057,6 +1061,46 @@ int SetRecordStat(char *statType, char *optOrder) { return 1; } // End of SetRecordStat +// parse user supplied output format string +// return NULL (invalid) if non aggregated elements are used in string +static char *ParseAggrOutputFormat(char *print_format, char *arg) { + char *s = strdup(print_format); + + // format already checked for fmt: or csv: - advance to token list + s += 4; + + while (*s) { + if (*s == '%') { + char *token = s++; + while (*s && isalnum(*s)) s++; + char sep = *s; + *s = '\0'; + // check, if this token is active + int active = 0; + int found = 0; + int index = 0; + while (aggregationTable[index].aggrElement) { + if (aggregationTable[index].fmt && strcmp(token, aggregationTable[index].fmt) == 0) { + found = 1; + active += aggregationTable[index].active; + } + index++; + } + + if (found == 1 && active == 0) { + // token not active + LogError("Output token %s not allowed with current aggregation %s", token, arg); + return NULL; + } + *s = sep; + } else { + s++; + } + } + dbg_printf("Custom output format: %s\n", print_format); + return print_format; +} // End of ParseAggrOutputFormat + char *ParseAggregateMask(char *print_format, char *arg) { dbg_printf("Enter %s\n", __func__); if (bidir_flows) { @@ -1064,19 +1108,21 @@ char *ParseAggregateMask(char *print_format, char *arg) { return NULL; } - int modeCSV = 0; + aggrPrintMode_t aggrPrintMode = AUTOFMT; char *sep = " "; char *prepend = AggrPrependFmt; char *append = AggrAppendFmt; if (print_format) { if (strcmp(print_format, "csv") == 0) { - modeCSV = 1; + aggrPrintMode = AUTOCSV; sep = ","; prepend = AggrPrependCvs; append = AggrAppendCvs; + } else if (strncmp(print_format, "fmt:", 4) == 0 || strncmp(print_format, "csv:", 4) == 0) { + aggrPrintMode = USERFMT; } else { - printf("Can not use print format %s to aggregate flows\n", print_format); - exit(EXIT_FAILURE); + LogError("Invalid print format %s to aggregate flows", print_format); + return NULL; } } @@ -1086,34 +1132,37 @@ char *ParseAggregateMask(char *print_format, char *arg) { maxKeyLen = 0; memset((void *)&aggregateInfo, 0, sizeof(aggregateInfo)); - size_t fmt_len = 0; + size_t fmtLen = 0; for (int i = 0; aggregationTable[i].aggrElement != NULL; i++) { - if (modeCSV == 0 && HasGeoDB && aggregationTable[i].fmt) { + if (aggrPrintMode == AUTOFMT && HasGeoDB && aggregationTable[i].fmt) { if (strcmp(aggregationTable[i].fmt, "%sa") == 0) aggregationTable[i].fmt = "%gsa"; if (strcmp(aggregationTable[i].fmt, "%da") == 0) aggregationTable[i].fmt = "%gda"; } - if (aggregationTable[i].fmt) fmt_len += (strlen(aggregationTable[i].fmt) + 1); + if (aggregationTable[i].fmt) fmtLen += (strlen(aggregationTable[i].fmt) + 1); } - fmt_len++; // max fmt string len incl. trailing '\0' + fmtLen++; // max fmt string len incl. trailing '\0' // add format prepend and append length - fmt_len += strlen(prepend) + strlen(append) + 6; // +6 for 'fmt:', 2 spaces + fmtLen += strlen(prepend) + strlen(append) + 6; // +6 for 'fmt:', 2 spaces - char *aggr_fmt = (char *)malloc(fmt_len); - if (!aggr_fmt) { + // create auto format output mode while scanning aggregation + // will be discarded, if user supplies format + char *formatStr = (char *)malloc(fmtLen); + if (!formatStr) { LogError("malloc() error in %s line %d: %s", __FILE__, __LINE__, strerror(errno)); return 0; } - aggr_fmt[0] = '\0'; - if (modeCSV) - fmt_len -= snprintf(aggr_fmt, fmt_len, "csv:%s,", prepend); + formatStr[0] = '\0'; + if (aggrPrintMode == AUTOCSV) + fmtLen -= snprintf(formatStr, fmtLen, "csv:%s,", prepend); else - fmt_len -= snprintf(aggr_fmt, fmt_len, "fmt:%s ", prepend); + fmtLen -= snprintf(formatStr, fmtLen, "fmt:%s ", prepend); uint32_t v4Mask = 0xffffffff; uint64_t v6Mask[2] = {0xffffffffffffffffLL, 0xffffffffffffffffLL}; // separate tokens - char *p = strtok(arg, ","); + char *aggrStr = strdup(arg); + char *p = strtok(aggrStr, ","); while (p) { uint32_t has_mask = 0; // check for subnet bits @@ -1128,6 +1177,7 @@ char *ParseAggregateMask(char *print_format, char *arg) { // IPv4 if (subnet < 1 || subnet > 32) { LogError("Subnet mask length '%d' out of range for IPv4", subnet); + free(aggrStr); return NULL; } v4Mask = 0xffffffff << (32 - subnet); @@ -1137,6 +1187,7 @@ char *ParseAggregateMask(char *print_format, char *arg) { // IPv6 if (subnet < 1 || subnet > 128) { LogError("Subnet mask length '%d' out of range for IPv4", subnet); + free(aggrStr); return NULL; } @@ -1152,6 +1203,7 @@ char *ParseAggregateMask(char *print_format, char *arg) { // rubbish *q = '/'; LogError("Need src4/dst4 or src6/dst6 for IPv4 or IPv6 to aggregate with explicit netmask: '%s'", p); + free(aggrStr); return NULL; } } @@ -1161,32 +1213,36 @@ char *ParseAggregateMask(char *print_format, char *arg) { if (aggregationTable[index].aggrElement == NULL) { LogError("Unknown aggregation field '%s'", p); + free(aggrStr); return NULL; } if (aggregationTable[index].active) { LogError("Duplicate aggregation element: %s", p); + free(aggrStr); return NULL; } if (has_mask) { if (aggregationTable[index].allowMask == 0) { LogError("Element %s does not take any netmask", p); + free(aggrStr); return NULL; } } if (aggregationTable[index].fmt != NULL) { - strncat(aggr_fmt, aggregationTable[index].fmt, fmt_len); - fmt_len -= strlen(aggregationTable[index].fmt); - strncat(aggr_fmt, sep, fmt_len); - fmt_len -= 1; + strncat(formatStr, aggregationTable[index].fmt, fmtLen); + fmtLen -= strlen(aggregationTable[index].fmt); + strncat(formatStr, sep, fmtLen); + fmtLen -= 1; } do { // loop over alternate extensions v4/v6 if (maskIndex >= MaxMaskArraySize) { LogError("Too many netmasks"); + free(aggrStr); return NULL; } if (has_mask) { @@ -1214,13 +1270,14 @@ char *ParseAggregateMask(char *print_format, char *arg) { if (elementCount == 0) { LogError("No aggregation specified!"); + free(aggrStr); return NULL; } aggregateInfo[elementCount] = -1; #ifdef DEVEL printf("Aggregate key: maxKeyLen: %zu bytes\n", maxKeyLen); - printf("Aggregate format string: '%s'\n", aggr_fmt); + printf("Aggregate format string: '%s'\n", formatStr); printf("Aggregate stack:\n"); for (int i = 0; aggregateInfo[i] >= 0; i++) { @@ -1238,12 +1295,25 @@ char *ParseAggregateMask(char *print_format, char *arg) { } #endif - if (modeCSV == 0) { - strncat(aggr_fmt, sep, fmt_len); - fmt_len--; + + if (aggrPrintMode == USERFMT) { + // check user supplied output format - discard auto format + free(formatStr); + char *formatStr = ParseAggrOutputFormat(print_format, arg); + free(aggrStr); + return formatStr; } - strncat(aggr_fmt, append, fmt_len); - return aggr_fmt; + + // else for auto created output format, add 'append' format + if (aggrPrintMode == AUTOFMT) { + strncat(formatStr, sep, fmtLen); + fmtLen--; + } + strncat(formatStr, append, fmtLen); + + dbg_printf("Auto output format: %s\n", formatStr); + free(aggrStr); + return formatStr; } // End of ParseAggregateMask