Skip to content

Commit cab1bf2

Browse files
committed
update gossip (part)
Signed-off-by: turuslan <[email protected]>
1 parent 5aa09a2 commit cab1bf2

35 files changed

+1509
-308
lines changed

Diff for: include/libp2p/basic/scheduler/manual_scheduler_backend.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ namespace libp2p::basic {
6666
}
6767
}
6868

69-
private:
7069
void callDeferred();
7170

71+
private:
7272
/// Current time, set manually
7373
std::chrono::milliseconds current_clock_;
7474

Diff for: include/libp2p/connection/stream_pair.hpp

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/**
2+
* Copyright Quadrivium LLC
3+
* All Rights Reserved
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#pragma once
8+
9+
#include <libp2p/peer/peer_id.hpp>
10+
11+
namespace libp2p::basic {
12+
class Scheduler;
13+
} // namespace libp2p::basic
14+
15+
namespace libp2p::connection {
16+
struct Stream;
17+
18+
std::pair<std::shared_ptr<Stream>, std::shared_ptr<Stream>> streamPair(
19+
std::shared_ptr<basic::Scheduler> post, PeerId peer1, PeerId peer2);
20+
} // namespace libp2p::connection

Diff for: include/libp2p/peer/protocol.hpp

+4
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,7 @@ namespace libp2p::peer {
1717
std::string;
1818

1919
} // namespace libp2p::peer
20+
21+
namespace libp2p {
22+
using peer::ProtocolName;
23+
} // namespace libp2p

Diff for: include/libp2p/protocol/gossip/explicit_peers.hpp

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/**
2+
* Copyright Quadrivium LLC
3+
* All Rights Reserved
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#pragma once
8+
9+
#include <libp2p/peer/peer_id.hpp>
10+
11+
namespace libp2p::protocol::gossip {
12+
class ExplicitPeers {
13+
public:
14+
bool contains(const PeerId &) const {
15+
return false;
16+
}
17+
};
18+
} // namespace libp2p::protocol::gossip

Diff for: include/libp2p/protocol/gossip/gossip.hpp

+32-17
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,18 @@
1010
#include <functional>
1111
#include <set>
1212
#include <string>
13+
#include <unordered_map>
1314
#include <vector>
1415

1516
#include <boost/optional.hpp>
1617

1718
#include <libp2p/common/byteutil.hpp>
1819
#include <libp2p/multi/multiaddress.hpp>
1920
#include <libp2p/peer/peer_id.hpp>
21+
#include <libp2p/peer/protocol.hpp>
2022
#include <libp2p/protocol/common/subscription.hpp>
23+
#include <libp2p/protocol/gossip/peer_kind.hpp>
24+
#include <libp2p/protocol/gossip/score_config.hpp>
2125

2226
namespace libp2p {
2327
struct Host;
@@ -41,6 +45,7 @@ namespace libp2p::protocol::gossip {
4145
struct Config {
4246
/// Network density factors for gossip meshes
4347
size_t D_min = 5;
48+
size_t D = 6;
4449
size_t D_max = 10;
4550

4651
/// Ideal number of connected peers to support the network
@@ -50,27 +55,12 @@ namespace libp2p::protocol::gossip {
5055
/// incoming peers will be rejected
5156
size_t max_connections_num = 1000;
5257

53-
/// Forward messages to all subscribers not in mesh
54-
/// (floodsub mode compatibility)
55-
bool floodsub_forward_mode = false;
56-
5758
/// Forward local message to local subscribers
5859
bool echo_forward_mode = false;
5960

6061
/// Read or write timeout per whole network operation
6162
std::chrono::milliseconds rw_timeout_msec{std::chrono::seconds(10)};
6263

63-
/// Lifetime of a message in message cache
64-
std::chrono::milliseconds message_cache_lifetime_msec{
65-
std::chrono::minutes(2)};
66-
67-
/// Topic's message seen cache lifetime
68-
std::chrono::milliseconds seen_cache_lifetime_msec{
69-
message_cache_lifetime_msec * 3 / 4};
70-
71-
/// Topic's seen cache limit
72-
unsigned seen_cache_limit = 100;
73-
7464
/// Heartbeat interval
7565
std::chrono::milliseconds heartbeat_interval_msec{1000};
7666

@@ -86,11 +76,36 @@ namespace libp2p::protocol::gossip {
8676
/// Max RPC message size
8777
size_t max_message_size = 1 << 24;
8878

89-
/// Protocol version
90-
std::string protocol_version = "/meshsub/1.0.0";
79+
/// Protocol versions
80+
std::unordered_map<ProtocolName, PeerKind> protocol_versions{
81+
{"/floodsub/1.0.0", PeerKind::Floodsub},
82+
{"/meshsub/1.0.0", PeerKind::Gossipsub},
83+
{"/meshsub/1.1.0", PeerKind::Gossipsubv1_1},
84+
{"/meshsub/1.2.0", PeerKind::Gossipsubv1_2},
85+
};
9186

9287
/// Sign published messages
9388
bool sign_messages = false;
89+
90+
size_t history_length{5};
91+
92+
size_t history_gossip{3};
93+
94+
std::chrono::seconds fanout_ttl{60};
95+
96+
std::chrono::seconds duplicate_cache_time{60};
97+
98+
std::chrono::seconds prune_backoff{60};
99+
100+
std::chrono::seconds unsubscribe_backoff{10};
101+
102+
size_t backoff_slack = 1;
103+
104+
bool flood_publish = true;
105+
106+
size_t max_ihave_length = 5000;
107+
108+
ScoreConfig score;
94109
};
95110

96111
using TopicId = std::string;

Diff for: include/libp2p/protocol/gossip/peer_kind.hpp

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/**
2+
* Copyright Quadrivium LLC
3+
* All Rights Reserved
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#pragma once
8+
9+
#include <cstdint>
10+
11+
namespace libp2p::protocol::gossip {
12+
enum class PeerKind : uint8_t {
13+
NotSupported,
14+
Floodsub,
15+
Gossipsub,
16+
Gossipsubv1_1,
17+
Gossipsubv1_2,
18+
};
19+
} // namespace libp2p::protocol::gossip

Diff for: include/libp2p/protocol/gossip/score.hpp

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/**
2+
* Copyright Quadrivium LLC
3+
* All Rights Reserved
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#pragma once
8+
9+
#include <libp2p/peer/peer_id.hpp>
10+
11+
namespace libp2p::protocol::gossip {
12+
class Score {
13+
public:
14+
bool below(const PeerId &peer_id, double threshold) {
15+
return false;
16+
}
17+
};
18+
} // namespace libp2p::protocol::gossip

Diff for: include/libp2p/protocol/gossip/score_config.hpp

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/**
2+
* Copyright Quadrivium LLC
3+
* All Rights Reserved
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#pragma once
8+
9+
namespace libp2p::protocol::gossip {
10+
struct ScoreConfig {
11+
double zero = 0;
12+
double gossip_threshold = -10;
13+
double publish_threshold = -50;
14+
};
15+
} // namespace libp2p::protocol::gossip

Diff for: include/libp2p/protocol/gossip/time_cache.hpp

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* Copyright Quadrivium LLC
3+
* All Rights Reserved
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#pragma once
8+
9+
#include <chrono>
10+
#include <deque>
11+
#include <qtils/empty.hpp>
12+
#include <unordered_map>
13+
#include <unordered_set>
14+
15+
namespace libp2p::protocol::gossip::time_cache {
16+
using Ttl = std::chrono::milliseconds;
17+
using Clock = std::chrono::steady_clock;
18+
using Time = Clock::time_point;
19+
20+
template <typename K, typename V>
21+
class TimeCache {
22+
public:
23+
TimeCache(Ttl ttl) : ttl_{ttl} {}
24+
25+
bool contains(const K &key) const {
26+
return map_.contains(key);
27+
}
28+
29+
void clearExpired(Time now = Clock::now()) {
30+
while (not expirations_.empty() and expirations_.front().first <= now) {
31+
map_.erase(expirations_.front().second);
32+
expirations_.pop_front();
33+
}
34+
}
35+
36+
V &getOrDefault(const K &key, Time now = Clock::now()) {
37+
clearExpired(now);
38+
auto it = map_.find(key);
39+
if (it == map_.end()) {
40+
it = map_.emplace(key, V()).first;
41+
expirations_.emplace_back(now + ttl_, it);
42+
}
43+
return it->second;
44+
}
45+
46+
private:
47+
using Map = std::unordered_map<K, V>;
48+
49+
Ttl ttl_;
50+
Map map_;
51+
std::deque<std::pair<Clock::time_point, typename Map::iterator>>
52+
expirations_;
53+
};
54+
55+
template <typename K>
56+
class DuplicateCache {
57+
public:
58+
DuplicateCache(Ttl ttl) : cache_{ttl} {}
59+
60+
bool contains(const K &key) const {
61+
return cache_.contains(key);
62+
}
63+
64+
bool insert(const K &key, Time now = Clock::now()) {
65+
cache_.clearExpired(now);
66+
if (cache_.contains(key)) {
67+
return false;
68+
}
69+
cache_.getOrDefault(key);
70+
return true;
71+
}
72+
73+
private:
74+
TimeCache<K, qtils::Empty> cache_;
75+
};
76+
} // namespace libp2p::protocol::gossip::time_cache
77+
78+
namespace libp2p::protocol::gossip {
79+
using time_cache::DuplicateCache;
80+
using time_cache::TimeCache;
81+
} // namespace libp2p::protocol::gossip

Diff for: src/basic/message_read_writer_uvarint.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ namespace libp2p::basic {
3232
}
3333

3434
auto msg_len = varint_res.value().toUInt64();
35+
auto buffer = std::make_shared<std::vector<uint8_t>>(msg_len, 0);
3536
if (0 != msg_len) {
36-
auto buffer = std::make_shared<std::vector<uint8_t>>(msg_len, 0);
3737
self->conn_->read(
3838
*buffer,
3939
msg_len,
@@ -44,7 +44,7 @@ namespace libp2p::basic {
4444
cb(std::move(buffer));
4545
});
4646
} else {
47-
cb(ResultType{});
47+
cb(buffer);
4848
}
4949
});
5050
}

Diff for: src/connection/CMakeLists.txt

+7
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,12 @@ target_link_libraries(p2p_loopback_stream
1919
p2p_peer_id
2020
)
2121

22+
libp2p_add_library(p2p_stream_pair
23+
stream_pair.cpp
24+
)
25+
target_link_libraries(p2p_stream_pair
26+
p2p_peer_id
27+
)
28+
2229
libp2p_install(p2p_loopback_stream)
2330

0 commit comments

Comments
 (0)