Skip to content

Commit e2da075

Browse files
committed
batch-gossip
1 parent 9458dd3 commit e2da075

File tree

6 files changed

+57
-110
lines changed

6 files changed

+57
-110
lines changed

CMakeLists.txt

+5-5
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,11 @@ if(NOT CMAKE_BUILD_TYPE)
9494
set(CMAKE_BUILD_TYPE RelWithDebInfo)
9595
endif()
9696

97-
set(debug OFF)
98-
if(CMAKE_BUILD_TYPE MATCHES "[Dd][Ee][Bb][Uu][Gg]")
99-
set(debug ON)
100-
add_definitions(-DLOKINET_DEBUG)
101-
endif()
97+
# set(debug OFF)
98+
# if(CMAKE_BUILD_TYPE MATCHES "[Dd][Ee][Bb][Uu][Gg]")
99+
# set(debug ON)
100+
# add_definitions(-DLOKINET_DEBUG)
101+
# endif()
102102

103103
option(WARN_DEPRECATED "show deprecation warnings" OFF)
104104

llarp/link/link_manager.cpp

+22-23
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,17 @@ namespace llarp
8181
return link_manager.router().loop()->call_get([this, remote]() { return service_conns.count(remote); });
8282
}
8383

84+
void Endpoint::for_each_service_conn(
85+
std::function<void(RouterID, std::shared_ptr<link::Connection>)> func, bool active_only)
86+
{
87+
assert(link_manager.router().loop()->in_event_loop());
88+
89+
std::ranges::for_each(service_conns.begin(), service_conns.end(), [&](auto c) mutable {
90+
if (c.second and (active_only ? c.second->is_active.load() : true))
91+
func(c.first, c.second);
92+
});
93+
}
94+
8495
void Endpoint::for_each_connection(std::function<void(const RouterID&, link::Connection&)> func)
8596
{
8697
link_manager.router().loop()->call([this, func = std::move(func)]() mutable {
@@ -174,8 +185,8 @@ namespace llarp
174185
return link_manager.router().loop()->call_get([&]() {
175186
size_t n{};
176187

177-
for (const auto& [_, c] : service_conns)
178-
if (c and (active_only ? c->is_active.load() : true))
188+
for (const auto& [_, conn] : service_conns)
189+
if (conn and (active_only ? conn->is_active.load() : true))
179190
++n;
180191

181192
return n;
@@ -328,7 +339,7 @@ namespace llarp
328339
_router.loop()->call_later(approximate_time(5s, 5), [&]() {
329340
regenerate_and_gossip_rc();
330341
_gossip_ticker =
331-
_router.loop()->call_every(_router._gossip_interval, [this]() { regenerate_and_gossip_rc(); });
342+
_router.loop()->call_every(_router._gossip_interval, [this]() mutable { regenerate_and_gossip_rc(); });
332343
});
333344
}
334345

@@ -803,28 +814,16 @@ namespace llarp
803814
_router.save_rc();
804815
}
805816

806-
// TESTNET: TODO: use batch sender
807817
void LinkManager::gossip_rc(const RouterID& last_sender, const RemoteRC& rc)
808818
{
809-
int count{};
810-
const auto& gossip_src = rc.router_id();
811-
812-
for (auto& [rid, conn] : ep->service_conns)
813-
{
814-
if (not conn or not conn->is_active)
815-
continue;
816-
817-
// don't send back to the gossip source or the last sender
818-
if (rid == gossip_src or rid == last_sender)
819-
continue;
820-
821-
count +=
822-
send_control_message(rid, "gossip_rc", GossipRC::serialize(last_sender, rc), [](oxen::quic::message) {
823-
log::trace(logcat, "PLACEHOLDER FOR GOSSIP RC RESPONSE HANDLER");
824-
});
825-
}
819+
ep->for_each_service_conn(
820+
[last_sender = last_sender, gossip_src = rc.router_id(), payload = GossipRC::serialize(last_sender, rc)](
821+
RouterID rid, std::shared_ptr<link::Connection> conn) mutable {
822+
if (rid == gossip_src or rid == last_sender)
823+
return;
826824

827-
log::critical(logcat, "Dispatched {} GossipRC requests!", count);
825+
conn->control_stream->command("gossip_rc", payload, [](auto) {});
826+
});
828827
}
829828

830829
void LinkManager::handle_gossip_rc(oxen::quic::message m)
@@ -920,7 +919,7 @@ namespace llarp
920919
logcat,
921920
"Bootstrap node confirmed RID:{} is registered; approving fetch request and saving RC!",
922921
remote_rc.router_id());
923-
_router.loop()->call_soon([&, remote_rc]() { gossip_rc(_router.local_rid(), remote_rc); });
922+
_router.loop()->call_soon([&, remote_rc]() mutable { gossip_rc(_router.local_rid(), remote_rc); });
924923
}
925924
else
926925
log::critical(

llarp/link/link_manager.hpp

+3
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ namespace llarp
9393

9494
void for_each_connection(std::function<void(const RouterID&, link::Connection&)> func);
9595

96+
void for_each_service_conn(
97+
std::function<void(RouterID, std::shared_ptr<link::Connection>)> func, bool active_only = true);
98+
9699
void close_connection(RouterID rid);
97100

98101
void close_all();

llarp/nodedb.cpp

+26-74
Original file line numberDiff line numberDiff line change
@@ -370,19 +370,6 @@ namespace llarp
370370
});
371371
}
372372

373-
void NodeDB::fetch_rcs(std::vector<RouterID> needed, bt_control_response_hook func)
374-
{
375-
if (_router.is_stopping() || not _router.is_running())
376-
{
377-
log::debug(logcat, "NodeDB unable to continue RC fetch -- router is stopped!");
378-
return post_rc_fetch(true);
379-
}
380-
381-
log::debug(
382-
logcat, "Dispatching FetchRC's request to {} for {} RCs!", fetch_source.short_string(), needed.size());
383-
_router.link_manager()->fetch_rcs(fetch_source, FetchRC::serialize(std::move(needed)), std::move(func));
384-
}
385-
386373
void NodeDB::fetch_rcs()
387374
{
388375
if (_router.is_stopping() || not _router.is_running())
@@ -393,71 +380,36 @@ namespace llarp
393380

394381
cycle_fetch_source();
395382

396-
return fetch_rcs(get_expired_rcs(), [this, source = fetch_source](oxen::quic::message m) mutable {
397-
if (not m)
398-
{
399-
log::warning(
400-
logcat, "RC fetch from {} {}", source, m.timed_out ? "timed out" : "failed: {}"_format(m.view()));
401-
}
402-
else
403-
{
404-
try
405-
{
406-
std::set<RemoteRC> rcs = FetchRC::deserialize_response(oxenc::bt_dict_consumer{m.body()});
383+
log::debug(logcat, "Dispatching FetchRC's request to {}!", fetch_source.short_string());
407384

408-
return rc_fetch_result(std::move(rcs));
409-
}
410-
catch (const std::exception& e)
385+
_router.link_manager()->fetch_rcs(
386+
fetch_source,
387+
FetchRC::serialize(get_expired_rcs()),
388+
[this, source = fetch_source](oxen::quic::message m) mutable {
389+
if (not m)
411390
{
412-
log::warning(logcat, "Failed to parse RC fetch response from {}: {}", source, e.what());
391+
log::warning(
392+
logcat,
393+
"RC fetch from {} {}",
394+
source,
395+
m.timed_out ? "timed out" : "failed: {}"_format(m.view()));
413396
}
414-
}
397+
else
398+
{
399+
try
400+
{
401+
std::set<RemoteRC> rcs = FetchRC::deserialize_response(oxenc::bt_dict_consumer{m.body()});
415402

416-
rc_fetch_result();
417-
});
403+
return rc_fetch_result(std::move(rcs));
404+
}
405+
catch (const std::exception& e)
406+
{
407+
log::warning(logcat, "Failed to parse RC fetch response from {}: {}", source, e.what());
408+
}
409+
}
418410

419-
// std::vector<RouterID> needed = get_expired_rcs();
420-
421-
// cycle_fetch_source();
422-
// auto& src = fetch_source;
423-
// log::debug(logcat, "Dispatching FetchRC's request to {} for {} RCs!", src.short_string(), needed.size());
424-
425-
// _router.link_manager()->fetch_rcs(
426-
// src, FetchRCMessage::serialize(needed), [this, source = fetch_source](oxen::quic::message m) mutable {
427-
// if (not m)
428-
// {
429-
// log::warning(
430-
// logcat,
431-
// "RC fetch from {} {}",
432-
// source,
433-
// m.timed_out ? "timed out" : "failed: {}"_format(m.view()));
434-
// }
435-
// else
436-
// {
437-
// try
438-
// {
439-
// std::set<RemoteRC> rcs;
440-
// oxenc::bt_dict_consumer btdc{m.body()};
441-
442-
// btdc.required("r");
443-
444-
// {
445-
// auto sublist = btdc.consume_list_consumer();
446-
447-
// while (not sublist.is_finished())
448-
// rcs.emplace(sublist.consume_dict_data());
449-
// }
450-
451-
// return rc_fetch_result(std::move(rcs));
452-
// }
453-
// catch (const std::exception& e)
454-
// {
455-
// log::warning(logcat, "Failed to parse RC fetch response from {}: {}", source, e.what());
456-
// }
457-
// }
458-
459-
// rc_fetch_result();
460-
// });
411+
rc_fetch_result();
412+
});
461413
}
462414

463415
void NodeDB::rc_fetch_result(std::optional<std::set<RemoteRC>> result)
@@ -501,7 +453,7 @@ namespace llarp
501453
auto& src = fetch_source;
502454
log::debug(logcat, "New fetch source is {}", src);
503455

504-
auto send_hook = [this, src](const bt_control_stream& control) mutable {
456+
auto send_hook = [this, src = src](const bt_control_stream& control) mutable {
505457
std::ranges::for_each(rid_sources.begin(), rid_sources.end(), [&](const RouterID& target) mutable {
506458
if (target == src)
507459
return;

llarp/nodedb.hpp

-3
Original file line numberDiff line numberDiff line change
@@ -336,9 +336,6 @@ namespace llarp
336336

337337
bool verify_store_gossip_rc(const RemoteRC& rc);
338338

339-
// public method that can be used in session initiation
340-
void fetch_rcs(std::vector<RouterID> needed, bt_control_response_hook func);
341-
342339
private:
343340
void fetch_rcs();
344341
void fetch_rids();

llarp/util/time.cpp

+1-5
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,7 @@ namespace llarp
3535

3636
std::chrono::milliseconds time_now_ms()
3737
{
38-
auto t = uptime();
39-
#ifdef TESTNET_SPEED
40-
t /= uint64_t{TESTNET_SPEED};
41-
#endif
42-
return t + time_since_epoch<std::chrono::milliseconds, std::chrono::system_clock>(started_at_system);
38+
return uptime() + time_since_epoch<std::chrono::milliseconds, std::chrono::system_clock>(started_at_system);
4339
}
4440

4541
nlohmann::json to_json(const std::chrono::milliseconds& t) { return to_milliseconds(t); }

0 commit comments

Comments
 (0)