Skip to content

Commit

Permalink
Allow user selected output format with custom aggregation. See #597
Browse files Browse the repository at this point in the history
  • Loading branch information
phaag committed Jan 31, 2025
1 parent 0618ad9 commit 284da0c
Showing 1 changed file with 96 additions and 26 deletions.
122 changes: 96 additions & 26 deletions src/nfdump/nflowcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,15 @@ static struct maskArray_s {
static uint32_t maskIndex = 1;

// For automatic output format generation in case of custom aggregation
// prepends start and duration time
#define AggrPrependFmt "%ts %td "
#define AggrPrependCvs "%ts,%td"
// appends counters
#define AggrAppendFmt "%pkt %byt %bps %bpp %fl"
#define AggrAppendCvs "%pkt,%byt,%bps,%bpp,%fl"

typedef enum { AUTOFMT = 0, AUTOCSV, USERFMT } aggrPrintMode_t;

static struct aggregationElement_s {
char *aggrElement; // name of aggregation parameter
aggregate_param_t param; // the parameter array
Expand Down Expand Up @@ -1057,26 +1061,68 @@ int SetRecordStat(char *statType, char *optOrder) {
return 1;
} // End of SetRecordStat

// parse user supplied output format string
// return NULL (invalid) if non aggregated elements are used in string
static char *ParseAggrOutputFormat(char *print_format, char *arg) {
char *s = strdup(print_format);

// format already checked for fmt: or csv: - advance to token list
s += 4;

while (*s) {
if (*s == '%') {
char *token = s++;
while (*s && isalnum(*s)) s++;
char sep = *s;
*s = '\0';
// check, if this token is active
int active = 0;
int found = 0;
int index = 0;
while (aggregationTable[index].aggrElement) {
if (aggregationTable[index].fmt && strcmp(token, aggregationTable[index].fmt) == 0) {
found = 1;
active += aggregationTable[index].active;
}
index++;
}

if (found == 1 && active == 0) {
// token not active
LogError("Output token %s not allowed with current aggregation %s", token, arg);
return NULL;
}
*s = sep;
} else {
s++;
}
}
dbg_printf("Custom output format: %s\n", print_format);
return print_format;
} // End of ParseAggrOutputFormat

char *ParseAggregateMask(char *print_format, char *arg) {
dbg_printf("Enter %s\n", __func__);
if (bidir_flows) {
LogError("Can not set custom aggregation in bidir mode");
return NULL;
}

int modeCSV = 0;
aggrPrintMode_t aggrPrintMode = AUTOFMT;
char *sep = " ";
char *prepend = AggrPrependFmt;
char *append = AggrAppendFmt;
if (print_format) {
if (strcmp(print_format, "csv") == 0) {
modeCSV = 1;
aggrPrintMode = AUTOCSV;
sep = ",";
prepend = AggrPrependCvs;
append = AggrAppendCvs;
} else if (strncmp(print_format, "fmt:", 4) == 0 || strncmp(print_format, "csv:", 4) == 0) {
aggrPrintMode = USERFMT;
} else {
printf("Can not use print format %s to aggregate flows\n", print_format);
exit(EXIT_FAILURE);
LogError("Invalid print format %s to aggregate flows", print_format);
return NULL;
}
}

Expand All @@ -1086,34 +1132,37 @@ char *ParseAggregateMask(char *print_format, char *arg) {
maxKeyLen = 0;
memset((void *)&aggregateInfo, 0, sizeof(aggregateInfo));

size_t fmt_len = 0;
size_t fmtLen = 0;
for (int i = 0; aggregationTable[i].aggrElement != NULL; i++) {
if (modeCSV == 0 && HasGeoDB && aggregationTable[i].fmt) {
if (aggrPrintMode == AUTOFMT && HasGeoDB && aggregationTable[i].fmt) {
if (strcmp(aggregationTable[i].fmt, "%sa") == 0) aggregationTable[i].fmt = "%gsa";
if (strcmp(aggregationTable[i].fmt, "%da") == 0) aggregationTable[i].fmt = "%gda";
}
if (aggregationTable[i].fmt) fmt_len += (strlen(aggregationTable[i].fmt) + 1);
if (aggregationTable[i].fmt) fmtLen += (strlen(aggregationTable[i].fmt) + 1);
}
fmt_len++; // max fmt string len incl. trailing '\0'
fmtLen++; // max fmt string len incl. trailing '\0'

// add format prepend and append length
fmt_len += strlen(prepend) + strlen(append) + 6; // +6 for 'fmt:', 2 spaces
fmtLen += strlen(prepend) + strlen(append) + 6; // +6 for 'fmt:', 2 spaces

char *aggr_fmt = (char *)malloc(fmt_len);
if (!aggr_fmt) {
// create auto format output mode while scanning aggregation
// will be discarded, if user supplies format
char *formatStr = (char *)malloc(fmtLen);
if (!formatStr) {
LogError("malloc() error in %s line %d: %s", __FILE__, __LINE__, strerror(errno));
return 0;
}
aggr_fmt[0] = '\0';
if (modeCSV)
fmt_len -= snprintf(aggr_fmt, fmt_len, "csv:%s,", prepend);
formatStr[0] = '\0';
if (aggrPrintMode == AUTOCSV)
fmtLen -= snprintf(formatStr, fmtLen, "csv:%s,", prepend);
else
fmt_len -= snprintf(aggr_fmt, fmt_len, "fmt:%s ", prepend);
fmtLen -= snprintf(formatStr, fmtLen, "fmt:%s ", prepend);

uint32_t v4Mask = 0xffffffff;
uint64_t v6Mask[2] = {0xffffffffffffffffLL, 0xffffffffffffffffLL};
// separate tokens
char *p = strtok(arg, ",");
char *aggrStr = strdup(arg);
char *p = strtok(aggrStr, ",");
while (p) {
uint32_t has_mask = 0;
// check for subnet bits
Expand All @@ -1128,6 +1177,7 @@ char *ParseAggregateMask(char *print_format, char *arg) {
// IPv4
if (subnet < 1 || subnet > 32) {
LogError("Subnet mask length '%d' out of range for IPv4", subnet);
free(aggrStr);
return NULL;
}
v4Mask = 0xffffffff << (32 - subnet);
Expand All @@ -1137,6 +1187,7 @@ char *ParseAggregateMask(char *print_format, char *arg) {
// IPv6
if (subnet < 1 || subnet > 128) {
LogError("Subnet mask length '%d' out of range for IPv4", subnet);
free(aggrStr);
return NULL;
}

Expand All @@ -1152,6 +1203,7 @@ char *ParseAggregateMask(char *print_format, char *arg) {
// rubbish
*q = '/';
LogError("Need src4/dst4 or src6/dst6 for IPv4 or IPv6 to aggregate with explicit netmask: '%s'", p);
free(aggrStr);
return NULL;
}
}
Expand All @@ -1161,32 +1213,36 @@ char *ParseAggregateMask(char *print_format, char *arg) {

if (aggregationTable[index].aggrElement == NULL) {
LogError("Unknown aggregation field '%s'", p);
free(aggrStr);
return NULL;
}

if (aggregationTable[index].active) {
LogError("Duplicate aggregation element: %s", p);
free(aggrStr);
return NULL;
}

if (has_mask) {
if (aggregationTable[index].allowMask == 0) {
LogError("Element %s does not take any netmask", p);
free(aggrStr);
return NULL;
}
}

if (aggregationTable[index].fmt != NULL) {
strncat(aggr_fmt, aggregationTable[index].fmt, fmt_len);
fmt_len -= strlen(aggregationTable[index].fmt);
strncat(aggr_fmt, sep, fmt_len);
fmt_len -= 1;
strncat(formatStr, aggregationTable[index].fmt, fmtLen);
fmtLen -= strlen(aggregationTable[index].fmt);
strncat(formatStr, sep, fmtLen);
fmtLen -= 1;
}

do {
// loop over alternate extensions v4/v6
if (maskIndex >= MaxMaskArraySize) {
LogError("Too many netmasks");
free(aggrStr);
return NULL;
}
if (has_mask) {
Expand Down Expand Up @@ -1214,13 +1270,14 @@ char *ParseAggregateMask(char *print_format, char *arg) {

if (elementCount == 0) {
LogError("No aggregation specified!");
free(aggrStr);
return NULL;
}
aggregateInfo[elementCount] = -1;

#ifdef DEVEL
printf("Aggregate key: maxKeyLen: %zu bytes\n", maxKeyLen);
printf("Aggregate format string: '%s'\n", aggr_fmt);
printf("Aggregate format string: '%s'\n", formatStr);

printf("Aggregate stack:\n");
for (int i = 0; aggregateInfo[i] >= 0; i++) {
Expand All @@ -1238,12 +1295,25 @@ char *ParseAggregateMask(char *print_format, char *arg) {
}

#endif
if (modeCSV == 0) {
strncat(aggr_fmt, sep, fmt_len);
fmt_len--;

if (aggrPrintMode == USERFMT) {
// check user supplied output format - discard auto format
free(formatStr);
char *formatStr = ParseAggrOutputFormat(print_format, arg);
free(aggrStr);
return formatStr;
}
strncat(aggr_fmt, append, fmt_len);
return aggr_fmt;

// else for auto created output format, add 'append' format
if (aggrPrintMode == AUTOFMT) {
strncat(formatStr, sep, fmtLen);
fmtLen--;
}
strncat(formatStr, append, fmtLen);

dbg_printf("Auto output format: %s\n", formatStr);
free(aggrStr);
return formatStr;

} // End of ParseAggregateMask

Expand Down

0 comments on commit 284da0c

Please sign in to comment.