Skip to content
This repository was archived by the owner on Mar 5, 2024. It is now read-only.
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 43 additions & 7 deletions src/blockchain_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -879,21 +879,28 @@ reset_ledger_to_snap(Hash, Height, State) ->
snapshot_sync(State1#state{snapshot_info=SnapInfo}).

start_sync(#state{blockchain = Chain, swarm_tid = SwarmTID} = State) ->
case get_random_peer(SwarmTID) of
case get_configured_or_random_peer(SwarmTID) of
no_peers ->
%% try again later when there's peers
schedule_sync(State);
RandomPeer ->
{Pid, Ref} = start_block_sync(SwarmTID, Chain, RandomPeer, [], <<>>),
Peer ->
{Pid, Ref} = start_block_sync(SwarmTID, Chain, Peer, [], <<>>),
lager:info("new block sync starting with Pid: ~p, Ref: ~p, Peer: ~p",
[Pid, Ref, RandomPeer]),
[Pid, Ref, Peer]),
State#state{sync_pid = Pid, sync_ref = Ref}
end.

get_configured_or_random_peer(SwarmTID) ->
case get_configured_sync_peer(SwarmTID) of
undefined -> get_random_peer(SwarmTID);
P -> P
end.

-spec get_random_peer(SwarmTID :: ets:tab()) -> no_peers | string().
get_random_peer(SwarmTID) ->
lager:debug("Get random peer"),
Peerbook = libp2p_swarm:peerbook(SwarmTID),
%% limit peers to random connections with public addresses
%% limit peers to connections with a public address
F = fun(Peer) ->
case application:get_env(blockchain, testing, false) of
false ->
Expand All @@ -906,9 +913,38 @@ get_random_peer(SwarmTID) ->
case libp2p_peerbook:random(Peerbook, [], F, 100) of
false -> no_peers;
{Addr, _Peer} ->
"/p2p/" ++ libp2p_crypto:bin_to_b58(Addr)
libp2p_crypto:pubkey_bin_to_p2p(Addr)
end.

-spec get_configured_sync_peer(ets:tab()) -> string() | undefined.
get_configured_sync_peer(SwarmTID) ->
case application:get_env(blockchain, sync_peers, []) of
[] ->
lager:debug("No sync_peers configured"),
undefined;
ConfiguredPeers = [P|_] when is_list(P) ->
Peerbook = libp2p_swarm:peerbook(SwarmTID),
{Left, Right} = lists:split(rand:uniform(length(ConfiguredPeers)), ConfiguredPeers),
get_configured_sync_peer(SwarmTID, Peerbook, Right ++ Left);
_Invalid ->
lager:warning("Ignoring invalid sync_peers config:~p",[_Invalid]),
undefined
end.

get_configured_sync_peer(SwarmTID, Peerbook, [ Peer | RestPeers ]) ->
case libp2p_swarm:connect(SwarmTID, Peer) of
{ok, _Session} ->
lager:debug("Connected to configured peer ~p",[Peer]),
Peer;
_Error ->
lager:debug("Failed to connect to configured peer ~p: ~p",[Peer, _Error]),
get_configured_sync_peer(SwarmTID, Peerbook, RestPeers)
end;
get_configured_sync_peer(_SwarmTID, _, []) ->
% unable to connect to any of the provided peers
lager:debug("Failed to connect to any configured peer"),
undefined.

reset_sync_timer(State) ->
lager:info("try again in ~p", [?SYNC_TIME]),
erlang:cancel_timer(State#state.sync_timer),
Expand Down Expand Up @@ -1028,7 +1064,7 @@ grab_snapshot(Height, Hash) ->
Chain = blockchain_worker:blockchain(),
SwarmTID = blockchain_swarm:tid(),

case get_random_peer(SwarmTID) of
case get_configured_or_random_peer(SwarmTID) of
no_peers -> {error, no_peers};
Peer ->
case libp2p_swarm:dial_framed_stream(SwarmTID,
Expand Down