diff --git a/rebar.lock b/rebar.lock index 8478eb81d5..51df8aade5 100644 --- a/rebar.lock +++ b/rebar.lock @@ -71,7 +71,7 @@ {<<"hackney">>,{pkg,<<"hackney">>,<<"1.18.0">>},0}, {<<"helium_proto">>, {git,"https://github.com/helium/proto.git", - {ref,"30f17c5d1a7942297923f4e743c681c46f917fc3"}}, + {ref,"4e1e0d97358d736ce861ef8d8d4a52eeb6ae9a4b"}}, 0}, {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.2.3">>},2}, {<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},1}, diff --git a/src/handlers/blockchain_sync_handler.erl b/src/handlers/blockchain_sync_handler.erl index 9e25322e71..385bd56af0 100644 --- a/src/handlers/blockchain_sync_handler.erl +++ b/src/handlers/blockchain_sync_handler.erl @@ -125,7 +125,7 @@ handle_data(client, Data0, #state{blockchain=Chain, path=Path, gossiped_hash=Gos ?SYNC_PROTOCOL_V1 -> Data0; ?SYNC_PROTOCOL_V2 -> zlib:uncompress(Data0) end, - #blockchain_sync_blocks_pb{blocks=BinBlocks} = + #blockchain_sync_blocks_pb{final=Final, blocks=BinBlocks} = blockchain_sync_handler_pb:decode_msg(Data, blockchain_sync_blocks_pb), Blocks = [blockchain_block:deserialize(B) || B <- BinBlocks], @@ -139,8 +139,12 @@ handle_data(client, Data0, #state{blockchain=Chain, path=Path, gossiped_hash=Gos %% nothing was plausible, see if it has anything else {noreply, State, blockchain_sync_handler_pb:encode_msg(#blockchain_sync_req_pb{msg={response, true}})}; HighestPlausible -> - lager:info("Eagerly re-gossiping ~p", [blockchain_block:height(HighestPlausible)]), - blockchain_gossip_handler:regossip_block(HighestPlausible, SwarmTID), + case Final of + true -> + lager:info("Eagerly re-gossiping ~p", [blockchain_block:height(HighestPlausible)]), + blockchain_gossip_handler:regossip_block(HighestPlausible, SwarmTID); + false -> ok + end, %% do this in a spawn so that the connection dying does not stop adding blocks {Pid, Ref} = spawn_monitor(fun() -> %% this will check any plausible blocks we have and add them to the chain if possible @@ -154,54 +158,22 @@ handle_data(client, Data0, #state{blockchain=Chain, path=Path, gossiped_hash=Gos {stop, normal, State, blockchain_sync_handler_pb:encode_msg(#blockchain_sync_req_pb{msg={response, false}})} end end; - + handle_data(server, Data, #state{blockchain=Blockchain, batch_size=BatchSize, batches_sent=Sent, batch_limit=Limit, - path=Path, requested=StRequested}=State) -> + requested=StRequested}=State) -> case blockchain_sync_handler_pb:decode_msg(Data, blockchain_sync_req_pb) of #blockchain_sync_req_pb{msg={hash, #blockchain_sync_hash_pb{hash = Hash, heights = Requested}}} -> {Blocks, Requested1} = build_blocks(Requested, Hash, Blockchain, BatchSize), - case Blocks of - [] -> - {stop, normal, State}; - [_|_] -> - Msg = mk_msg(Blocks, Path), - case Requested1 == [] andalso Requested /= [] of - true -> - {stop, normal, State, Msg}; - _ -> - lager:info("sending blocks ~p to sync peer", [element(1, lists:unzip(Blocks))]), - {LastHeight, _LastBlock} = lists:last(Blocks), - {noreply, State#state{batches_sent=Sent+1, - last_block_height=LastHeight, - requested = Requested1}, - Msg} - end - end; + maybe_send_blocks(Blocks, Requested1, Requested, State); #blockchain_sync_req_pb{msg={response, true}} when Sent < Limit, State#state.last_block_height /= undefined -> StartingBlockHeight = State#state.last_block_height, {Blocks, Requested1} = build_blocks(StRequested, StartingBlockHeight, Blockchain, BatchSize), - case Blocks of - [] -> - {stop, normal, State}; - _ -> - Msg = mk_msg(Blocks, Path), - case Requested1 == [] andalso StRequested /= [] of - true -> - {stop, normal, State, Msg}; - _ -> - lager:info("sending blocks ~p to sync peer", [element(1, lists:unzip(Blocks))]), - {LastHeight, _LastBlock} = lists:last(Blocks), - {noreply, State#state{batches_sent=Sent+1, - last_block_height=LastHeight, - requested = Requested1}, - Msg} - end - end; + maybe_send_blocks(Blocks, Requested1, StRequested, State); _ -> %% ack was false, block was undefined, limit was hit or the message was not understood {stop, normal, State} @@ -265,11 +237,30 @@ build_blocks(R, Hash, Blockchain, BatchSize) when is_list(R) -> {Blocks ++ ExtraBlocks, R -- R2}. -mk_msg(Blocks, Path) -> - Msg1 = #blockchain_sync_blocks_pb{blocks= [B || {_H, B} <- Blocks]}, +-spec maybe_send_blocks(list(), list(), list(), #state{}) -> {stop, normal, #state{}} | + {stop, normal, #state{}, binary()} | + {noreply, #state{}, binary()}. +maybe_send_blocks([], _, _, State) -> + {stop, normal, State}; +maybe_send_blocks(BlockTuples, Requested, OldRequested,#state{path=Path, batches_sent=Sent} = State) -> + Final = Requested == [] andalso OldRequested /= [], + {Heights, Blocks} = lists:unzip(BlockTuples), + lager:info("sending blocks ~p to sync peer", [Heights]), + Msg1 = #blockchain_sync_blocks_pb{final=Final, blocks=Blocks}, Msg0 = blockchain_sync_handler_pb:encode_msg(Msg1), Msg = case Path of ?SYNC_PROTOCOL_V1 -> Msg0; ?SYNC_PROTOCOL_V2 -> zlib:compress(Msg0) end, - Msg. + case Final of + true -> + {stop, normal, State, Msg}; + false -> + {LastHeight, _LastBlock} = lists:last(BlockTuples), + {noreply, State#state{batches_sent=Sent+1, + last_block_height=LastHeight, + requested = Requested}, + Msg} + end. + +