Skip to content

Commit

Permalink
Merge pull request #147 from neilcook/nits
Browse files Browse the repository at this point in the history
Refactor DNS Lookups mainly
  • Loading branch information
neilcook authored Jul 3, 2017
2 parents 152b9ed + 7218c5b commit af5977d
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 72 deletions.
3 changes: 3 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ wforce_LDADD = \
$(BOOST_REGEX_LIBS) $(LIBHIREDIS_LIBS) $(LIBCURL) $(LIBCRYPTO_LIBS) \
$(YAMLCPP_LIBS)

EXTRA_wforce_DEPENDENCIES = \
$(EXT_LIBS) $(WFORCE_LIBS)

noinst_HEADERS = \
blacklist.hh \
wforce.hh \
Expand Down
63 changes: 38 additions & 25 deletions common/dns_lookup.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,13 @@
#define GETDNS_STR_PORT "port"

std::mutex resolv_mutx;
std::map<std::string, WFResolver> resolvMap;
std::map<std::string, std::shared_ptr<WFResolver>> resolvMap;

WFResolver::WFResolver(): num_contexts(NUM_GETDNS_CONTEXTS)
{
resolver_list = getdns_list_create();
req_timeout = DNS_REQUEST_TIMEOUT;
mutxp = std::make_shared<std::mutex>();
contextsp = std::make_shared<std::vector<GetDNSContext>>();
context_indexp = std::make_shared<unsigned int>(0);
context_index = 0;
}

WFResolver::~WFResolver()
Expand All @@ -56,12 +54,16 @@ WFResolver::~WFResolver()

void WFResolver::set_request_timeout(uint64_t timeout)
{
// atomic type
req_timeout = timeout;
}

void WFResolver::set_num_contexts(unsigned int nc)
{
num_contexts = nc;
std::lock_guard<std::mutex> lock(mutx);
// only do this before any contexts are created
if (contexts.size() == 0)
num_contexts = nc;
}

void setAddressDict(getdns_dict* dict, const ComboAddress& ca)
Expand Down Expand Up @@ -90,6 +92,7 @@ void WFResolver::add_resolver(const std::string& address, int port)
setAddressDict(resolver_dict, ca);
getdns_dict_set_int(resolver_dict, GETDNS_STR_PORT, port);
size_t list_index=0;
std::lock_guard<std::mutex> lock(mutx);
getdns_list_get_length(resolver_list, &list_index);
getdns_list_set_dict(resolver_list, list_index, resolver_dict);
}
Expand Down Expand Up @@ -126,24 +129,23 @@ void WFResolver::init_dns_contexts()
getdns_context* my_ctx=NULL;

if (create_dns_context(&my_ctx)) {
GetDNSContext ctx;
ctx.context_ctx = my_ctx;
ctx.context_mutex = std::make_shared<std::mutex>();
contextsp->push_back(ctx);
std::shared_ptr<GetDNSContext> ctxp = std::make_shared<GetDNSContext>();
ctxp->context_ctx = my_ctx;
contexts.push_back(ctxp);
}
}
}

bool WFResolver::get_dns_context(GetDNSContext& ret_ctx)
bool WFResolver::get_dns_context(std::shared_ptr<GetDNSContext>* ret_ctx)
{
std::lock_guard<std::mutex> lock(*mutxp);
if (contextsp->size() == 0) {
std::lock_guard<std::mutex> lock(mutx);
if (contexts.size() == 0) {
init_dns_contexts();
}
if (contextsp->size() > 0) {
if (*context_indexp >= num_contexts)
*context_indexp = 0;
ret_ctx = (*contextsp)[(*context_indexp)++]; // copy
if (contexts.size() > 0) {
if (context_index >= num_contexts)
context_index = 0;
*ret_ctx = contexts[context_index++];
return true;
}
else
Expand Down Expand Up @@ -277,22 +279,25 @@ std::vector<std::string> WFResolver::do_lookup_address_by_name_async(getdns_cont

std::vector<std::string> WFResolver::lookup_address_by_name(const std::string& name, boost::optional<size_t> num_retries_p)
{
GetDNSContext context;
std::shared_ptr<GetDNSContext> context;
std::vector<std::string> retvec;
size_t num_retries=0;

if (num_retries_p)
num_retries = *num_retries_p;

size_t num_resolvers=0;
getdns_list_get_length(resolver_list, &num_resolvers);
{
std::lock_guard<std::mutex> lock(mutx);
getdns_list_get_length(resolver_list, &num_resolvers);
}
if (num_retries >= num_resolvers)
num_retries = num_resolvers - 1;

// if we can get a context we can do a lookup
if (get_dns_context(context) == true) {
std::lock_guard<std::mutex> lock(*(context.context_mutex));
retvec = do_lookup_address_by_name_async(context.context_ctx, name, num_retries);
if (get_dns_context(&context) == true) {
std::lock_guard<std::mutex> lock(context->context_mutex);
retvec = do_lookup_address_by_name_async(context->context_ctx, name, num_retries);
}
return retvec;
}
Expand Down Expand Up @@ -401,21 +406,29 @@ std::vector<std::string> WFResolver::do_lookup_name_by_address_async(getdns_cont

std::vector<std::string> WFResolver::lookup_name_by_address(const ComboAddress& ca, boost::optional<size_t> num_retries_p)
{
GetDNSContext context;
std::shared_ptr<GetDNSContext> context;
getdns_dict *address;
std::vector<std::string> retvec;
size_t num_retries=0;

if (num_retries_p)
num_retries = *num_retries_p;

size_t num_resolvers=0;
{
std::lock_guard<std::mutex> lock(mutx);
getdns_list_get_length(resolver_list, &num_resolvers);
}
if (num_retries >= num_resolvers)
num_retries = num_resolvers - 1;

address = getdns_dict_create();

// if we can setup the context we can do the lookup
if (address && get_dns_context(context)) {
std::lock_guard<std::mutex> lock(*(context.context_mutex));
if (address && get_dns_context(&context)) {
std::lock_guard<std::mutex> lock(context->context_mutex);
setAddressDict(address, ca);
retvec = do_lookup_name_by_address_async(context.context_ctx, address, num_retries);
retvec = do_lookup_name_by_address_async(context->context_ctx, address, num_retries);
}
if (address)
getdns_dict_destroy(address);
Expand Down
17 changes: 10 additions & 7 deletions common/dns_lookup.hh
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
#include <vector>
#include <memory>
#include <mutex>
#include <atomic>
#include "iputils.hh"

struct GetDNSContext {
getdns_context* context_ctx;
std::shared_ptr<std::mutex> context_mutex;
std::mutex context_mutex;
};

struct AsyncThreadUserData {
Expand All @@ -46,6 +47,8 @@ class WFResolver {
public:
WFResolver();
~WFResolver();
WFResolver(const WFResolver&) = delete;
WFResolver& operator=(const WFResolver&) = delete;
void add_resolver(const std::string& address, int port);
void set_request_timeout(uint64_t timeout);
void set_num_contexts(unsigned int nc);
Expand All @@ -55,19 +58,19 @@ public:
protected:
void init_dns_contexts();
bool create_dns_context(getdns_context **context);
bool get_dns_context(GetDNSContext& ret_ctx);
bool get_dns_context(std::shared_ptr<GetDNSContext>* ret_ctx);
std::vector<std::string> do_lookup_address_by_name(getdns_context *context, const std::string& name, size_t num_retries);
std::vector<std::string> do_lookup_name_by_address(getdns_context *context, getdns_dict* addr_dict, size_t num_retries=0);
std::vector<std::string> do_lookup_address_by_name_async(getdns_context *context, const std::string& name, size_t num_retries);
std::vector<std::string> do_lookup_name_by_address_async(getdns_context *context, getdns_dict* addr_dict, size_t num_retries=0);
private:
getdns_list* resolver_list;
uint64_t req_timeout;
std::shared_ptr<std::mutex> mutxp;
std::atomic<uint64_t> req_timeout;
std::mutex mutx;
unsigned int num_contexts;
std::shared_ptr<std::vector<GetDNSContext>> contextsp;
std::shared_ptr<unsigned int> context_indexp;
std::vector<std::shared_ptr<GetDNSContext>> contexts;
std::atomic<unsigned int> context_index;
};

extern std::mutex resolv_mutx;
extern std::map<std::string, WFResolver> resolvMap;
extern std::map<std::string, std::shared_ptr<WFResolver>> resolvMap;
4 changes: 2 additions & 2 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ AC_SUBST([YAHTTP_LIBS], ['-L$(abs_top_builddir)/ext/yahttp/yahttp -lyahttp'])
AC_SUBST([JSON11_CFLAGS], ['-I$(top_srcdir)/ext/json11'])
AC_SUBST([JSON11_LIBS], ['-L$(abs_top_builddir)/ext/json11 -ljson11'])
AC_SUBST([EXT_CFLAGS], ['-I$(top_srcdir)/ext'])
AC_SUBST([EXT_LIBS], ['-L$(abs_top_builddir)/ext/ext -lext'])
AC_SUBST([EXT_LIBS], ['$(abs_top_builddir)/ext/ext/libext.la'])
AC_SUBST([WFORCE_CFLAGS], ['-I$(top_srcdir)'/common])
AC_SUBST([WFORCE_LIBS], ['-L$(abs_top_builddir)/common -lweakforce'])
AC_SUBST([WFORCE_LIBS], ['$(abs_top_builddir)/common/libweakforce.la'])
# We need libcrypto for hash functions
PDNS_CHECK_LIBCRYPTO
# Check for LuaJIT first then Lua
Expand Down
26 changes: 13 additions & 13 deletions docs/manpages/trackalert.conf.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ cannot be called inside the report or background functions:
thread uses a separate Lua Context, the number of which is set with
setNumLuaStates(). Defaults to 4 if not specified. For example:

setNumSiblingThreads(2)
setNumSchedulerThreads(2)

* setNumWebHookThreads(\<num threads\>) - Set the number of threads in
the pool used to send webhook events. Defaults to 4 if not
Expand Down Expand Up @@ -131,6 +131,18 @@ cannot be called inside the report or background functions:

setBackground("mybg", backgroundFunc)

* scheduleBackgroundFunc(\<cron string\>, \<background function
name\>) - Tells trackalert to run the specified function according
to the given cron schedule (note the
name given in setBackground() is used, *not* the actual function
name). Note that cron ranges are not currently supported - if you
want to schedule the same function to run for example on two
different days of the week, then you would use two different calls
to this function to achieve that. For example:

scheduleBackgroundFunc("0 0 1 * *", "mybg")
scheduleBackgroundFunc("0 0 6 * *", "mybg")

* setCustomEndpoint(\<name of endpoint\>, \<custom lua function\>) -
Create a new custom REST endpoint with the given name, which when
invoked will call the supplied custom lua function. This allows
Expand Down Expand Up @@ -343,18 +355,6 @@ configuration or within the allow/report/reset functions:

* GeoIPRecord.longitude - The longitude, e.g. -122.08380126953

* scheduleBackgroundFunc(\<cron string\>, \<background function
name\>) - Tells trackalert to run the specified function according
to the given cron schedule (note the
name given in setBackground() is used, *not* the actual function
name). Note that cron ranges are not currently supported - if you
want to schedule the same function to run for example on two
different days of the week, then you would use two different calls
to this function to achieve that. For example:

scheduleBackgroundFunc("0 0 1 * *", "mybg")
scheduleBackgroundFunc("0 0 6 * *", "mybg")

* CustomFuncArgs - The only parameter to custom functions
is a CustomFuncArgs table. This table contains the following fields:

Expand Down
24 changes: 12 additions & 12 deletions elasticqueries/login_fail_query.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,36 @@
"aggs": {
"most_sig_ips": {
"significant_terms": {
"field": "remote.keyword",
"field": "remote",
"size": 6
}
},
"most_sig_logins": {
"significant_terms": {
"field": "login.keyword",
"field": "login",
"size": 6
}
},
"most_sig_cc": {
"significant_terms": {
"field": "geoip.country_code2.keyword",
"field": "geoip.country_code2",
"size": 6
}
},
"most_sig_browser": {
"significant_terms": {
"field": "device_attrs.browser.family.keyword",
"field": "device_attrs.browser.family",
"size": 6
},
"aggs": {
"os": {
"terms": {
"field": "device_attrs.os.family.keyword"
"field": "device_attrs.os.family"
},
"aggs": {
"device": {
"terms": {
"field": "device_attrs.device.family.keyword"
"field": "device_attrs.device.family"
}
}
}
Expand All @@ -50,36 +50,36 @@
},
"most_sig_imapc": {
"significant_terms": {
"field": "device_attrs.imapc.family.keyword",
"field": "device_attrs.imapc.family",
"size": 6
},
"aggs": {
"os": {
"terms": {
"field": "device_attrs.os.family.keyword"
"field": "device_attrs.os.family"
}
}
}
},
"most_sig_mobileapp": {
"significant_terms": {
"field": "device_attrs.app.name.keyword",
"field": "device_attrs.app.name",
"size": 6
},
"aggs": {
"brand": {
"terms": {
"field": "device_attrs.app.brand.keyword"
"field": "device_attrs.app.brand"
},
"aggs": {
"os": {
"terms": {
"field": "device_attrs.os.family.keyword"
"field": "device_attrs.os.family"
},
"aggs": {
"device": {
"terms": {
"field": "device_attrs.device.family.keyword"
"field": "device_attrs.device.family"
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions trackalert/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ trackalert_LDADD = \
$(BOOST_REGEX_LIBS) $(LIBHIREDIS_LIBS) $(LIBCURL) $(LIBCRYPTO_LIBS) \
$(YAMLCPP_LIBS) $(GEOIP_LIBS)

EXTRA_trackalert_DEPENDENCIES = \
$(EXT_LIBS) $(WFORCE_LIBS)

noinst_HEADERS = \
trackalert.hh \
trackalert-luastate.hh \
Expand Down
16 changes: 15 additions & 1 deletion trackalert/trackalert-lua.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,21 @@ vector<std::function<void(void)>> setupLua(bool client, bool multi_lua,
if (!multi_lua) {
c_lua.writeFunction("scheduleBackgroundFunc", [bg_func_map](const std::string& cron_str, const std::string& func_name) {
g_bg_schedulerp->cron(cron_str, [func_name] {
g_luamultip->background(func_name);
try {
g_luamultip->background(func_name);
}
catch(LuaContext::ExecutionErrorException& e) {
try {
std::rethrow_if_nested(e);
errlog("Lua background function [%s] exception: %s", func_name, e.what());
}
catch (const std::exception& ne) {
errlog("Exception in background function [%s] exception: %s", func_name, ne.what());
}
catch (const WforceException& ne) {
errlog("Exception in background function [%s] exception: %s", func_name, ne.reason);
}
}
});
});
}
Expand Down
2 changes: 2 additions & 0 deletions trackalert/trackalert.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ char* my_generator(const char* text, int state)
"showCustomWebHooks()",
"showPerfStats()",
"showVersion()",
"showCustomEndpoints()",
"setCustomEndpoint(",
"addWebHook(",
"addCustomWebHook(",
"setNumWebHookThreads("
Expand Down
2 changes: 1 addition & 1 deletion trackalert/trackalert.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ end
setReport(report)

function background()
print("foo")
infoLog("Ran background thread", {})
end

setBackground("background", background)
Expand Down
Loading

0 comments on commit af5977d

Please sign in to comment.