diff --git a/.gitignore b/.gitignore index b1a1f24f43..811033d910 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ _build *.iml rebar3.crashdump data/ +!src/data/ .DS_Store src/pb/ src/grpc/autogen diff --git a/src/blockchain.erl b/src/blockchain.erl index 4268551ef5..f58e93d870 100644 --- a/src/blockchain.erl +++ b/src/blockchain.erl @@ -1460,26 +1460,116 @@ build(Height, Blockchain, N, Acc) -> end end. +-spec build_hash_chain( + H, + blockchain_block:block(), + blockchain(), + rocksdb:cf_handle() +) -> + [H, ...] when H :: blockchain_block:hash(). +build_hash_chain(StopHash, StartBlock, #blockchain{db=DB}, CF) -> + %% XXX This parallelized build_hash_chain has a potential PERF DRAWBACK: + %% + %% Because it processes the WHOLE of blocks CF BEFORE knowing what will + %% actually be needed, if the caller needs a significantly smaller + %% segment than the whole - we end-up doing a lot of wasted + %% deserializations. + %% + %% Some ideas for solutions: + %% A. eliminate the desrialization step entirtely, by maintaining + %% a rocksdb CF of ChildHash->ParentHash mappings; + %% B. dispatch between serial and parallel versions based on some + %% conditions/configs; + %% C. accept the redundancy, but create an opportunity to terminate + %% early once a long-enough segment is built - reimplement Parents + %% as a cache process which does two things: + %% 1. starts a background job which fills the cache with parents; + %% 2. accepts parent requests and either retrieves responses from + %% cache or compute them (updating the cache). + Parents = + maps:from_list( + data_stream:pmap_to_bag( + rocksdb_stream(DB, CF), + fun ({<>, <>}) -> + ChildBlock = blockchain_block:deserialize(ChildBlockBin), + <> = blockchain_block:prev_hash(ChildBlock), + {ChildHash, ParentHash} + end + ) + ), + StartHash = blockchain_block:hash_block(StartBlock), + HashChain = trace_lineage(Parents, StopHash, StartHash), + %% TODO Predicate the disjointness check on a config? + case find_orphans(Parents) of + [] -> + ok; + [_|_]=Orphans -> + lager:warning( + "Disjoint blocks database. " + "Hash chain may not be valid. " + "Found orphan blocks: ~p, " + "Hash chain length: ~p.", + [length(HashChain), Orphans] + ) + end, + HashChain. --spec build_hash_chain(blockchain_block:hash(), blockchain_block:block(), blockchain(), rocksdb:cf_handle()) -> [blockchain_block:hash(), ...]. -build_hash_chain(StopHash,StartingBlock, Blockchain, CF) -> - BlockHash = blockchain_block:hash_block(StartingBlock), - ParentHash = blockchain_block:prev_hash(StartingBlock), - build_hash_chain_(StopHash, CF, Blockchain, [ParentHash, BlockHash]). +-spec find_orphans(#{B => B}) -> [B] when B :: binary(). +find_orphans(ChildToParent) -> + %% Given the pairs: #{A => 0, C => B} + %% and assuming the chain: [0, A, B, C] + %% we can see that B=>A is missing. + %% + %% Since we cannot actually assume, we can find non-genesis-parents which + %% have no parents themselves. In above example that would be B. Implying + %% the chain is disjoint. + [ + Parent + || + {_, Parent} <- maps:to_list(ChildToParent), + maps:find(Parent, ChildToParent) =:= error, % Parent has no parent. + lists:sum(binary_to_list(Parent)) > 0 % Parent is non-genesis. + ]. + +-spec trace_lineage(#{A => A}, A, A) -> [A, ...]. +trace_lineage(Parents, Oldest, Youngest) -> + (fun TraceBack ([Child | _]=Lineage) -> + case maps:find(Child, Parents) of + error -> Lineage; + {ok, Oldest} -> Lineage; % Oldest ancestor is excluded. + {ok, Parent} -> TraceBack([Parent | Lineage]) + end + end)([Youngest]). + +-spec rocksdb_stream(rocksdb:db_handle(), rocksdb:cf_handle()) -> + data_stream:t({K :: binary(), V :: binary()}). +rocksdb_stream(DB, CF) -> + fun () -> + case rocksdb:iterator(DB, CF, []) of + {error, Reason} -> + error({rocks_key_streaming_failure, Reason}); + {ok, Iter} -> + case rocksdb:iterator_move(Iter, first) of + {ok, K, V} -> + {some, {{K, V}, rocksdb_stream(Iter)}}; + Error -> + error({rocks_key_streaming_failure, Error}) + end + end + end. --spec build_hash_chain_(blockchain_block:hash(), rocksdb:cf_handle(), blockchain(), [blockchain_block:hash(), ...]) -> [blockchain_block:hash()]. -build_hash_chain_(StopHash, CF, Blockchain = #blockchain{db=DB}, [ParentHash|Tail]=Acc) -> - case ParentHash == StopHash of - true -> - %% reached the end - Tail; - false -> - case rocksdb:get(DB, CF, ParentHash, []) of - {ok, BinBlock} -> - build_hash_chain_(StopHash, CF, Blockchain, [blockchain_block:prev_hash(blockchain_block:deserialize(BinBlock))|Acc]); - _ -> - Acc - end +-spec rocksdb_stream(rocksdb:itr_handle()) -> + data_stream:t({K :: binary(), V :: binary()}). +rocksdb_stream(Iter) -> + fun () -> + case rocksdb:iterator_move(Iter, next) of + {ok, K, V} -> + {some, {{K, V}, rocksdb_stream(Iter)}}; + {error, invalid_iterator} -> + none; + {error, Reason} -> + error({rocks_key_streaming_failure, Reason}) + end end. -spec fold_chain(fun((Blk :: blockchain_block:block(), AccIn :: any()) -> NewAcc :: any()), @@ -3356,4 +3446,62 @@ block_info_upgrade_test() -> V2BlockInfo = upgrade_block_info(V1BlockInfo, Block, Chain), ?assertMatch(ExpV2BlockInfo, V2BlockInfo). +trace_lineage_test_() -> + [ + ?_assertEqual( + [middle, youngest], + trace_lineage( + #{ + youngest => middle, + middle => oldest + }, + oldest, + youngest + ) + ), + ?_assertEqual( + [b, c, d, e], + trace_lineage( + #{ + e => d, + d => c, + c => b, + b => a + }, + a, + e + ) + ) + ]. + +find_orphans_test_() -> + [ + ?_assertEqual( + %% [0, A, B, C] + %% #{A => 0, C => B} + %% missing B=>A, so B is orphan. + [<<"B">>], + find_orphans( + #{ + <<"A">> => <<0>>, + <<"C">> => <<"B">> + } + ) + ), + ?_assertEqual( + %% [X, A, B, C] + %% #{A => X, C => B} + %% missing B=>A, + %% X is not genesis, + %% so both X and B are orphans. + [<<"X">>, <<"B">>], + find_orphans( + #{ + <<"A">> => <<"X">>, + <<"C">> => <<"B">> + } + ) + ) + ]. + -endif. diff --git a/src/blockchain_utils.erl b/src/blockchain_utils.erl index f4a9f88dc4..c471c45530 100644 --- a/src/blockchain_utils.erl +++ b/src/blockchain_utils.erl @@ -10,6 +10,7 @@ -include("blockchain_vars.hrl"). -export([ + cpus/0, shuffle_from_hash/2, shuffle/1, rand_from_hash/1, rand_state/1, @@ -290,6 +291,7 @@ validation_width() -> N end. +-spec cpus() -> non_neg_integer(). cpus() -> Ct = erlang:system_info(schedulers_online), max(2, ceil(Ct/2) + 1). diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl new file mode 100644 index 0000000000..b0d2cfab65 --- /dev/null +++ b/src/data/data_stream.erl @@ -0,0 +1,249 @@ +-module(data_stream). + +-export_type([ + t/1 +]). + +-export([ + next/1, + from_list/1, + to_list/1, + iter/2, + pmap_to_bag/2, + pmap_to_bag/3 + %% TODO map + %% TODO fold +]). + +-record(sched, { + id :: reference(), + producers :: [{pid(), reference()}], + consumers :: [{pid(), reference()}], + consumers_free :: [pid()], % available to work. + work :: [any()], % received from producers. + results :: [any()] % received from consumers. +}). + +%% API ======================================================================== + +-type t(A) :: fun(() -> none | {some, {A, t(A)}}). + +-spec next(t(A)) -> none | {some, {A, t(A)}}. +next(T) when is_function(T) -> + T(). + +-spec iter(fun((A) -> ok), t(A)) -> ok. +iter(F, T0) -> + case next(T0) of + none -> + ok; + {some, {X, T1}} -> + F(X), + iter(F, T1) + end. + +-spec from_list([A]) -> t(A). +from_list([]) -> + fun () -> none end; +from_list([X | Xs]) -> + fun () -> {some, {X, from_list(Xs)}} end. + +-spec to_list(t(A)) -> [A]. +to_list(T0) when is_function(T0) -> + case next(T0) of + none -> + []; + {some, {X, T1}} -> + [X | to_list(T1)] + end. + +%% A pmap which doesn't preserve order. +-spec pmap_to_bag(t(A), fun((A) -> B)) -> [B]. +pmap_to_bag(Xs, F) when is_function(Xs), is_function(F) -> + pmap_to_bag(Xs, F, blockchain_utils:cpus()). + +-spec pmap_to_bag(t(A), fun((A) -> B), non_neg_integer()) -> [B]. +pmap_to_bag(T, F, J) when is_function(T), is_function(F), is_integer(J), J > 0 -> + CallerPid = self(), + SchedID = make_ref(), + Scheduler = + fun () -> + SchedPid = self(), + Consumer = + fun Consume () -> + ConsumerPid = self(), + SchedPid ! {SchedID, consumer_ready, ConsumerPid}, + receive + {SchedID, job, X} -> + Y = F(X), + SchedPid ! {SchedID, consumer_output, Y}, + Consume(); + {SchedID, done} -> + ok + end + end, + Producer = + fun () -> + %% XXX Producer is racing against consumers. + %% + %% This hasn't (yet) caused a problem, but in theory it is + %% bad: producer is pouring into the scheduler's queue as + %% fast as possible, potentially faster than consumers can + %% pull from it, so heap usage could explode. + %% + %% Solution ideas: + %% A. have the scheduler call the producer whenever more + %% work is asked for, but ... that can block the + %% scheduler, starving consumers; + %% B. produce in (configurable size) batches, pausing + %% production when batch is full and resuming when not + %% (this is probably the way to go). + ok = iter(fun (X) -> SchedPid ! {SchedID, producer_output, X} end, T) + end, + Ys = + sched(#sched{ + id = SchedID, + producers = [spawn_monitor(Producer)], + consumers = [spawn_monitor(Consumer) || _ <- lists:duplicate(J, {})], + consumers_free = [], + work = [], + results = [] + }), + CallerPid ! {SchedID, Ys} + end, + %% XXX Scheduling from a dedicated process to avoid conflating our 'DOWN' + %% messages (from producers and consumers) with those of the caller + %% process. + {SchedPid, SchedMonRef} = spawn_monitor(Scheduler), + %% TODO timeout? + receive + {SchedID, Ys} -> + receive + {'DOWN', SchedMonRef, process, SchedPid, normal} -> + Ys + end; + {'DOWN', SchedMonRef, process, SchedPid, Reason} -> + error({data_stream_scheduler_crashed_before_sending_results, Reason}) + end. + +%% Internal =================================================================== + +-spec sched(#sched{}) -> [any()]. +sched(#sched{id=_, producers=[], consumers=[], consumers_free=[], work=[], results=Ys}) -> + Ys; +sched(#sched{id=ID, producers=[], consumers=[_|_], consumers_free=[_|_]=CsFree, work=[]}=S0) -> + _ = [C ! {ID, done} || C <- CsFree], + sched(S0#sched{consumers_free=[]}); +sched(#sched{id=_, producers=_, consumers=[_|_], consumers_free=[_|_], work=[_|_]}=S0) -> + S1 = sched_assign(S0), + sched(S1); +sched(#sched{id=ID, producers=Ps, consumers=_, consumers_free=CsFree, work=Xs, results=Ys }=S) -> + receive + {ID, producer_output, X} -> sched(S#sched{work=[X | Xs]}); + {ID, consumer_output, Y} -> sched(S#sched{results=[Y | Ys]}); + {ID, consumer_ready, C} -> sched(S#sched{consumers_free=[C | CsFree]}); + {'DOWN', MonRef, process, Pid, normal} -> + S1 = sched_remove_worker(S, {Pid, MonRef}), + sched(S1); + {'DOWN', MonRef, process, Pid, Reason} -> + case lists:member({Pid, MonRef}, Ps) of + true -> error({?MODULE, pmap_to_bag, producer_crash, Reason}); + false -> error({?MODULE, pmap_to_bag, consumer_crash, Reason}) + end + end. + +-spec sched_remove_worker(#sched{}, {pid(), reference()}) -> #sched{}. +sched_remove_worker(#sched{producers=Ps, consumers=Cs, consumers_free=CsFree}=S, {Pid, _}=PidRef) -> + case lists:member(PidRef, Ps) of + true -> + S#sched{producers = Ps -- [PidRef]}; + false -> + S#sched{ + consumers = Cs -- [PidRef], + consumers_free = CsFree -- [Pid] + } + end. + +-spec sched_assign(#sched{}) -> #sched{}. +sched_assign(#sched{consumers_free=[], work=Xs}=S) -> S#sched{consumers_free=[], work=Xs}; +sched_assign(#sched{consumers_free=Cs, work=[]}=S) -> S#sched{consumers_free=Cs, work=[]}; +sched_assign(#sched{consumers_free=[C | Cs], work=[X | Xs], id=ID}=S) -> + C ! {ID, job, X}, + sched_assign(S#sched{consumers_free=Cs, work=Xs}). + +%% Tests ====================================================================== + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +pmap_to_bag_test_() -> + NonDeterminism = fun (N) -> timer:sleep(rand:uniform(N)) end, + FromListWithNonDeterminism = + fun (N) -> + fun Stream (Xs) -> + fun () -> + case Xs of + [] -> + none; + [X | Xs1] -> + NonDeterminism(N), + {some, {X, Stream(Xs1)}} + end + end + end + end, + Tests = + [ + begin + G = fun (X) -> NonDeterminism(ConsumerDelay), F(X) end, + Test = + ?_assertEqual( + lists:sort(lists:map(G, Xs)), + lists:sort(pmap_to_bag( + (FromListWithNonDeterminism(ProducerDelay))(Xs), + G, + J + )) + ), + Timeout = 1000 + ProducerDelay + (ConsumerDelay * J), + Name = lists:flatten(io_lib:format( + "#Xs: ~p, J: ~p, ProducerDelay: ~p, ConsumerDelay: ~p, Timeout: ~p", + [length(Xs), J, ProducerDelay, ConsumerDelay, Timeout] + )), + {Name, {timeout, Timeout, Test}} + end + || + J <- lists:seq(1, 16), + F <- [ + fun (X) -> {X, X} end, + fun (X) -> X * 2 end + ], + Xs <- [ + lists:seq(1, 100) + ], + {ProducerDelay, ConsumerDelay} <- + begin + Lo = 1, + Hi = 10, + [ + {Hi, Lo}, % slow producer, fast consumer + {Lo, Hi}, % fast producer, slow consumer + {Lo, Lo}, % both fast + {Hi, Hi} % both slow + ] + end + ], + {inparallel, Tests}. + +round_trip_test_() -> + [ + ?_assertEqual(Xs, to_list(from_list(Xs))) + || + Xs <- [ + [1, 2, 3], + [a, b, c], + [<<>>, <<"foo">>, <<"bar">>, <<"baz">>, <<"qux">>] + ] + ]. + +-endif.