diff --git a/src/cli/blockchain_cli_snapshot.erl b/src/cli/blockchain_cli_snapshot.erl index d4c2155fdd..84f7a83ec7 100644 --- a/src/cli/blockchain_cli_snapshot.erl +++ b/src/cli/blockchain_cli_snapshot.erl @@ -139,7 +139,8 @@ snapshot_grab(["snapshot", "grab", HeightStr, HashStr, Filename], [], []) -> Hash = hex_to_binary(HashStr), {ok, Snapshot} = blockchain_worker:grab_snapshot(Height, Hash), %% NOTE: grab_snapshot returns a deserialized snapshot - file:write_file(Filename, blockchain_ledger_snapshot_v1:serialize(Snapshot)) + ok = file:write_file(Filename, blockchain_ledger_snapshot_v1:serialize(Snapshot)), + [clique_status:text(io_lib:format("saved to ~p", [Filename]))] catch _Type:Error -> [clique_status:text(io_lib:format("failed: ~p", [Error]))] @@ -213,7 +214,7 @@ snapshot_list(["snapshot", "list"], [], []) -> Chain = blockchain_worker:blockchain(), Snapshots = blockchain:find_last_snapshots(Chain, 5), case Snapshots of - undefined -> ok; + undefined -> [clique_status:text("No snapshot found")]; _ -> [ clique_status:text(io_lib:format("Height ~p\nHash ~p (~p)\nHave ~p\n", [Height, Hash, binary_to_hex(Hash), diff --git a/src/handlers/blockchain_snapshot_handler.erl b/src/handlers/blockchain_snapshot_handler.erl index ffcb8c64f8..8722607a27 100644 --- a/src/handlers/blockchain_snapshot_handler.erl +++ b/src/handlers/blockchain_snapshot_handler.erl @@ -8,6 +8,7 @@ -behavior(libp2p_framed_stream). +-include_lib("kernel/include/file.hrl"). -include_lib("helium_proto/include/blockchain_snapshot_handler_pb.hrl"). %% ------------------------------------------------------------------ @@ -101,9 +102,8 @@ handle_data(server, Data, #state{chain = Chain} = State) -> #blockchain_snapshot_req_pb{height = _Height, hash = Hash} -> case blockchain:get_snapshot(Hash, Chain) of {ok, {file, FileName}} -> - {ok, Bin} = file:read_file(FileName), - Msg = #blockchain_snapshot_resp_pb{snapshot = Bin}, - {noreply, State, blockchain_snapshot_handler_pb:encode_msg(Msg)}; + lager:info("streaming snapshot from ~p", [FileName]), + {noreply, State, mk_file_stream_fun(FileName)}; {ok, Snap} -> lager:info("sending snapshot ~p", [Hash]), Msg = #blockchain_snapshot_resp_pb{snapshot = Snap}, @@ -121,3 +121,35 @@ handle_data(server, Data, #state{chain = Chain} = State) -> handle_info(_Type, _Msg, State) -> lager:info("unhandled message ~p ~p", [_Type, _Msg]), {noreply, State}. + +%% ------------------------------------------------------------------ +%% internal functions +%% ------------------------------------------------------------------ + +-define(FILE_STREAM_BLOCKSIZE, 4096). + +% send file bytes in chunks +mk_file_stream_fun(File) when is_pid(File) or is_record(File, file_descriptor) -> + case file:read(File, ?FILE_STREAM_BLOCKSIZE) of + {ok, Data} -> + fun() -> + {mk_file_stream_fun(File), Data} + end; + eof -> + file:close(File), + fun() -> ok end + end; +mk_file_stream_fun(FileName) -> + {ok, #file_info{size = Bytes}} = file:read_file_info(FileName), + {ok, File} = file:open(FileName, [read, binary, raw]), + PBSize = small_ints:encode_varint(Bytes), + %% 18 introduces a protocol buffers bytes type + Msg0 = <<18, PBSize/binary>>, + HdrSize = byte_size(Msg0), + {ok, Data} = file:read(File, ?FILE_STREAM_BLOCKSIZE - HdrSize), + Msg = <>, + {Bytes + HdrSize, + fun() -> + {mk_file_stream_fun(File), Msg} + end + }.