From 988ade6ae65c2e3e89d2077865876fd32e398849 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Tue, 10 May 2022 10:12:27 -0400 Subject: [PATCH 1/8] Implement stream with a parallel map to a bag --- .gitignore | 1 + src/blockchain_utils.erl | 2 + src/data/data_stream.erl | 235 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 238 insertions(+) create mode 100644 src/data/data_stream.erl diff --git a/.gitignore b/.gitignore index b1a1f24f43..811033d910 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ _build *.iml rebar3.crashdump data/ +!src/data/ .DS_Store src/pb/ src/grpc/autogen diff --git a/src/blockchain_utils.erl b/src/blockchain_utils.erl index f4a9f88dc4..c471c45530 100644 --- a/src/blockchain_utils.erl +++ b/src/blockchain_utils.erl @@ -10,6 +10,7 @@ -include("blockchain_vars.hrl"). -export([ + cpus/0, shuffle_from_hash/2, shuffle/1, rand_from_hash/1, rand_state/1, @@ -290,6 +291,7 @@ validation_width() -> N end. +-spec cpus() -> non_neg_integer(). cpus() -> Ct = erlang:system_info(schedulers_online), max(2, ceil(Ct/2) + 1). diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl new file mode 100644 index 0000000000..fdbdaa66dd --- /dev/null +++ b/src/data/data_stream.erl @@ -0,0 +1,235 @@ +-module(data_stream). + +-export_type([ + t/1 +]). + +-export([ + next/1, + from_list/1, + to_list/1, + iter/2, + pmap_to_bag/2, + pmap_to_bag/3 + %% TODO map + %% TODO fold +]). + +-record(sched, { + id :: reference(), + ps_up :: [{pid(), reference()}], % producers up. + cs_up :: [{pid(), reference()}], % consumers up. + cs_free :: [pid()], % consumers available to work. + xs :: [any()], % inputs. received from producers. + ys :: [any()] % outputs received from consumers. +}). + +%% API ======================================================================== + +-type t(A) :: fun(() -> none | {some, {A, t(A)}}). + +-spec next(t(A)) -> none | {some, {A, t(A)}}. +next(T) when is_function(T) -> + T(). + +-spec iter(fun((A) -> ok), t(A)) -> ok. +iter(F, T0) -> + case next(T0) of + none -> + ok; + {some, {X, T1}} -> + F(X), + iter(F, T1) + end. + +-spec from_list([A]) -> t(A). +from_list([]) -> + fun () -> none end; +from_list([X | Xs]) -> + fun () -> {some, {X, from_list(Xs)}} end. + +-spec to_list(t(A)) -> [A]. +to_list(T0) when is_function(T0) -> + case next(T0) of + none -> + []; + {some, {X, T1}} -> + [X | to_list(T1)] + end. + +%% A pmap which doesn't preserve order. +-spec pmap_to_bag(t(A), fun((A) -> B)) -> [B]. +pmap_to_bag(Xs, F) when is_function(Xs), is_function(F) -> + pmap_to_bag(Xs, F, blockchain_utils:cpus()). + +-spec pmap_to_bag(t(A), fun((A) -> B), non_neg_integer()) -> [B]. +pmap_to_bag(T, F, J) when is_function(T), is_function(F), is_integer(J), J > 0 -> + CallerPid = self(), + SchedID = make_ref(), + Scheduler = + fun () -> + SchedPid = self(), + Consumer = + fun Work () -> + ConsumerPid = self(), + SchedPid ! {SchedID, consumer_ready, ConsumerPid}, + receive + {SchedID, job, X} -> + Y = F(X), + SchedPid ! {SchedID, consumer_output, Y}, + Work(); + {SchedID, done} -> + ok + end + end, + Producer = + fun () -> + ok = iter(fun (X) -> SchedPid ! {SchedID, producer_output, X} end, T) + end, + Ys = + sched(#sched{ + id = SchedID, + ps_up = [spawn_monitor(Producer)], + cs_up = [spawn_monitor(Consumer) || _ <- lists:duplicate(J, {})], + cs_free = [], + xs = [], + ys = [] + }), + CallerPid ! {SchedID, Ys} + end, + %% XXX Scheduling from a dedicated process to avoid conflating our 'DOWN' + %% messages (from producers and consumers) with those of the caller + %% process. + {SchedPid, SchedMonRef} = spawn_monitor(Scheduler), + %% TODO timeout? + receive + {SchedID, Ys} -> + receive + {'DOWN', SchedMonRef, process, SchedPid, normal} -> + Ys + end; + {'DOWN', SchedMonRef, process, SchedPid, Reason} -> + error({data_stream_scheduler_crashed_before_sending_results, Reason}) + end. + +%% Internal =================================================================== + +-spec sched(#sched{}) -> [any()]. +sched(#sched{id=_, ps_up=[], cs_up=[], cs_free=[], xs=[], ys=Ys}) -> + Ys; +sched(#sched{id=ID, ps_up=[], cs_up=[_|_], cs_free=[_|_]=CsFree, xs=[]}=S0) -> + _ = [C ! {ID, done} || C <- CsFree], + sched(S0#sched{cs_free=[]}); +sched(#sched{id=_, ps_up=_, cs_up=[_|_], cs_free=[_|_], xs=[_|_]}=S0) -> + S1 = sched_assign(S0), + sched(S1); +sched(#sched{id=ID, ps_up=Ps, cs_up=_, cs_free=CsFree, xs=Xs, ys=Ys }=S) -> + receive + {ID, producer_output, X} -> sched(S#sched{xs=[X | Xs]}); + {ID, consumer_output, Y} -> sched(S#sched{ys=[Y | Ys]}); + {ID, consumer_ready, C} -> sched(S#sched{cs_free=[C | CsFree]}); + {'DOWN', MonRef, process, Pid, normal} -> + S1 = sched_remove_worker(S, {Pid, MonRef}), + sched(S1); + {'DOWN', MonRef, process, Pid, Reason} -> + case lists:member({Pid, MonRef}, Ps) of + true -> error({?MODULE, pmap_to_bag, producer_crash, Reason}); + false -> error({?MODULE, pmap_to_bag, consumer_crash, Reason}) + end + end. + +-spec sched_remove_worker(#sched{}, {pid(), reference()}) -> #sched{}. +sched_remove_worker(#sched{ps_up=Ps, cs_up=Cs, cs_free=CsFree}=S, {Pid, _}=PidRef) -> + case lists:member(PidRef, Ps) of + true -> + S#sched{ps_up = Ps -- [PidRef]}; + false -> + S#sched{ + cs_up = Cs -- [PidRef], + cs_free = CsFree -- [Pid] + } + end. + +-spec sched_assign(#sched{}) -> #sched{}. +sched_assign(#sched{cs_free=[], xs=Xs}=S) -> S#sched{cs_free=[], xs=Xs}; +sched_assign(#sched{cs_free=Cs, xs=[]}=S) -> S#sched{cs_free=Cs, xs=[]}; +sched_assign(#sched{cs_free=[C | Cs], xs=[X | Xs], id=ID}=S) -> + C ! {ID, job, X}, + sched_assign(S#sched{cs_free=Cs, xs=Xs}). + +%% Tests ====================================================================== + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +pmap_to_bag_test_() -> + NonDeterminism = fun (N) -> timer:sleep(rand:uniform(N)) end, + FromListWithNonDeterminism = + fun (N) -> + fun Stream (Xs) -> + fun () -> + case Xs of + [] -> + none; + [X | Xs1] -> + NonDeterminism(N), + {some, {X, Stream(Xs1)}} + end + end + end + end, + Tests = + [ + begin + G = fun (X) -> NonDeterminism(ConsumerDelay), F(X) end, + Test = + ?_assertEqual( + lists:sort(lists:map(G, Xs)), + lists:sort(pmap_to_bag( + (FromListWithNonDeterminism(ProducerDelay))(Xs), + G, + J + )) + ), + Timeout = 1000 + ProducerDelay + (ConsumerDelay * J), + Name = lists:flatten(io_lib:format( + "#Xs: ~p, J: ~p, ProducerDelay: ~p, ConsumerDelay: ~p, Timeout: ~p", + [length(Xs), J, ProducerDelay, ConsumerDelay, Timeout] + )), + {Name, {timeout, Timeout, Test}} + end + || + J <- lists:seq(1, 16), + F <- [ + fun (X) -> {X, X} end, + fun (X) -> X * 2 end + ], + Xs <- [ + lists:seq(1, 100) + ], + {ProducerDelay, ConsumerDelay} <- + begin + Lo = 1, + Hi = 10, + [ + {Hi, Lo}, % slow producer, fast consumer + {Lo, Hi}, % fast producer, slow consumer + {Lo, Lo}, % both fast + {Hi, Hi} % both slow + ] + end + ], + {inparallel, Tests}. + +round_trip_test_() -> + [ + ?_assertEqual(Xs, to_list(from_list(Xs))) + || + Xs <- [ + [1, 2, 3], + [a, b, c], + [<<>>, <<"foo">>, <<"bar">>, <<"baz">>, <<"qux">>] + ] + ]. + +-endif. From 9bfd163db1c77d3b5fc8a9db604bd0f4db285163 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Tue, 10 May 2022 10:14:28 -0400 Subject: [PATCH 2/8] Parallelize build_hash_chain The main idea is to 1. asynchronously: enumerate all `{K, V}` pairs in the blocks CF; 2. in-parallel: 2.1 deserialize blocks; 2.2 lookup parents; 2.3 accumulate `{Parent, Child}` pairs; 3. in-serial: 3.1 build graph from above-built pairs; 3.2 walk backwards from the youngest given hash and trace its longest possible lineage up to the oldest given hash. The last step, 3.2, is essentially what the previous implementation (that this one replaces) used to do, but this time all the expensive deserialization has already been done, in parallel. --- src/blockchain.erl | 106 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 87 insertions(+), 19 deletions(-) diff --git a/src/blockchain.erl b/src/blockchain.erl index 4268551ef5..644e450679 100644 --- a/src/blockchain.erl +++ b/src/blockchain.erl @@ -1460,26 +1460,94 @@ build(Height, Blockchain, N, Acc) -> end end. - --spec build_hash_chain(blockchain_block:hash(), blockchain_block:block(), blockchain(), rocksdb:cf_handle()) -> [blockchain_block:hash(), ...]. -build_hash_chain(StopHash,StartingBlock, Blockchain, CF) -> - BlockHash = blockchain_block:hash_block(StartingBlock), - ParentHash = blockchain_block:prev_hash(StartingBlock), - build_hash_chain_(StopHash, CF, Blockchain, [ParentHash, BlockHash]). - --spec build_hash_chain_(blockchain_block:hash(), rocksdb:cf_handle(), blockchain(), [blockchain_block:hash(), ...]) -> [blockchain_block:hash()]. -build_hash_chain_(StopHash, CF, Blockchain = #blockchain{db=DB}, [ParentHash|Tail]=Acc) -> - case ParentHash == StopHash of - true -> - %% reached the end - Tail; - false -> - case rocksdb:get(DB, CF, ParentHash, []) of - {ok, BinBlock} -> - build_hash_chain_(StopHash, CF, Blockchain, [blockchain_block:prev_hash(blockchain_block:deserialize(BinBlock))|Acc]); - _ -> - Acc +-spec build_hash_chain( + H, + blockchain_block:block(), + blockchain(), + rocksdb:cf_handle() +) -> + [H] when H :: blockchain_block:hash(). +build_hash_chain(StopHash, StartBlock, #blockchain{db=DB}, CF) -> + StartHash = blockchain_block:hash_block(StartBlock), + build_hash_chain_in_parallel(DB, CF, StopHash, StartHash). + +-spec build_hash_chain_in_parallel( + rocksdb:db_handle(), + rocksdb:cf_handle(), + H, + H +) -> + [H] when H :: blockchain_block:hash(). +build_hash_chain_in_parallel(DB, CF, Oldest, Youngest) -> + Relations = digraph:new([cyclic]), % XXX 'cyclic' is a perf compromise: + %% 'acyclic' is what we'd ideally want, but it is an expensive option, as + %% it forces a cycles check ON EACH add_edge operation, so by using + %% 'cyclic' we're just trusting that our prev_hash relationships are all + %% correct, which they should be, if not - we have bigger problems. + Edges = + data_stream:pmap_to_bag( + rocksdb_stream(DB, CF), + fun ({<>, <>}) -> + ChildBlock = blockchain_block:deserialize(ChildBlockBin), + <> = blockchain_block:prev_hash(ChildBlock), + %% XXX Can't update digraph in parallel - no concurrent writes. + {ParentHash, ChildHash} end + ), + _ = [digraph:add_vertex(Relations, Child) || {_ , Child} <- Edges], + _ = [digraph:add_edge(Relations, Parent, Child) || {Parent, Child} <- Edges], + HashChain = + %% XXX The simplest solution: + %% digraph:get_path(Relations, Oldest, Youngest) + %% is fine in the ideal case, but doesn't work in the case of a + %% broken path (when a block was missing and its parent could not + %% be looked up), in which case we need to return the longest + %% lineage found, traced back from the youngest hash. TraceBack + %% works for either scenario. + (fun TraceBack ([Child | _]=Lineage) -> + case digraph:in_neighbours(Relations, Child) of + [] -> + Lineage; + [Oldest] -> + Lineage; + [Parent] -> + TraceBack([Parent | Lineage]); + [_|_]=Parents -> + error({incorrect_chain, multiple_parents, Child, Parents}) + end + end)([Youngest]), + true = digraph:delete(Relations), + HashChain. + +-spec rocksdb_stream(rocksdb:db_handle(), rocksdb:cf_handle()) -> + data_stream:t({K :: binary(), V :: binary()}). +rocksdb_stream(DB, CF) -> + fun () -> + case rocksdb:iterator(DB, CF, []) of + {error, Reason} -> + error({rocks_key_streaming_failure, Reason}); + {ok, Iter} -> + case rocksdb:iterator_move(Iter, first) of + {ok, K, V} -> + {some, {{K, V}, rocksdb_stream(Iter)}}; + Error -> + error({rocks_key_streaming_failure, Error}) + end + end + end. + +-spec rocksdb_stream(rocksdb:itr_handle()) -> + data_stream:t({K :: binary(), V :: binary()}). +rocksdb_stream(Iter) -> + fun () -> + case rocksdb:iterator_move(Iter, next) of + {ok, K, V} -> + {some, {{K, V}, rocksdb_stream(Iter)}}; + {error, invalid_iterator} -> + none; + {error, Reason} -> + error({rocks_key_streaming_failure, Reason}) + end end. -spec fold_chain(fun((Blk :: blockchain_block:block(), AccIn :: any()) -> NewAcc :: any()), From 1c0344e17be790c22fc7aeb2d8bf913bc486f262 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sat, 14 May 2022 11:01:58 -0400 Subject: [PATCH 3/8] Simplify hash relations structure after moving from topsorting to trace-back - I realized that things are much simpler now - duh! Bonus - this is also about 30 seconds faster on my machine. --- src/blockchain.erl | 55 ++++++++++++++-------------------------------- 1 file changed, 17 insertions(+), 38 deletions(-) diff --git a/src/blockchain.erl b/src/blockchain.erl index 644e450679..ce7d6fbb03 100644 --- a/src/blockchain.erl +++ b/src/blockchain.erl @@ -1479,45 +1479,24 @@ build_hash_chain(StopHash, StartBlock, #blockchain{db=DB}, CF) -> ) -> [H] when H :: blockchain_block:hash(). build_hash_chain_in_parallel(DB, CF, Oldest, Youngest) -> - Relations = digraph:new([cyclic]), % XXX 'cyclic' is a perf compromise: - %% 'acyclic' is what we'd ideally want, but it is an expensive option, as - %% it forces a cycles check ON EACH add_edge operation, so by using - %% 'cyclic' we're just trusting that our prev_hash relationships are all - %% correct, which they should be, if not - we have bigger problems. - Edges = - data_stream:pmap_to_bag( - rocksdb_stream(DB, CF), - fun ({<>, <>}) -> - ChildBlock = blockchain_block:deserialize(ChildBlockBin), - <> = blockchain_block:prev_hash(ChildBlock), - %% XXX Can't update digraph in parallel - no concurrent writes. - {ParentHash, ChildHash} - end + Parents = + maps:from_list( + data_stream:pmap_to_bag( + rocksdb_stream(DB, CF), + fun ({<>, <>}) -> + ChildBlock = blockchain_block:deserialize(ChildBlockBin), + <> = blockchain_block:prev_hash(ChildBlock), + {ChildHash, ParentHash} + end + ) ), - _ = [digraph:add_vertex(Relations, Child) || {_ , Child} <- Edges], - _ = [digraph:add_edge(Relations, Parent, Child) || {Parent, Child} <- Edges], - HashChain = - %% XXX The simplest solution: - %% digraph:get_path(Relations, Oldest, Youngest) - %% is fine in the ideal case, but doesn't work in the case of a - %% broken path (when a block was missing and its parent could not - %% be looked up), in which case we need to return the longest - %% lineage found, traced back from the youngest hash. TraceBack - %% works for either scenario. - (fun TraceBack ([Child | _]=Lineage) -> - case digraph:in_neighbours(Relations, Child) of - [] -> - Lineage; - [Oldest] -> - Lineage; - [Parent] -> - TraceBack([Parent | Lineage]); - [_|_]=Parents -> - error({incorrect_chain, multiple_parents, Child, Parents}) - end - end)([Youngest]), - true = digraph:delete(Relations), - HashChain. + (fun TraceBack ([Child | _]=Lineage) -> + case maps:find(Child, Parents) of + error -> Lineage; + {ok, Oldest} -> Lineage; + {ok, Parent} -> TraceBack([Parent | Lineage]) + end + end)([Youngest]). -spec rocksdb_stream(rocksdb:db_handle(), rocksdb:cf_handle()) -> data_stream:t({K :: binary(), V :: binary()}). From 4a55a8d5989303d4ae8c69a887c2df150e4085c5 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sat, 14 May 2022 12:38:21 -0400 Subject: [PATCH 4/8] Test the lineage trace --- src/blockchain.erl | 46 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/src/blockchain.erl b/src/blockchain.erl index ce7d6fbb03..0596b4fb47 100644 --- a/src/blockchain.erl +++ b/src/blockchain.erl @@ -1466,19 +1466,8 @@ build(Height, Blockchain, N, Acc) -> blockchain(), rocksdb:cf_handle() ) -> - [H] when H :: blockchain_block:hash(). + [H, ...] when H :: blockchain_block:hash(). build_hash_chain(StopHash, StartBlock, #blockchain{db=DB}, CF) -> - StartHash = blockchain_block:hash_block(StartBlock), - build_hash_chain_in_parallel(DB, CF, StopHash, StartHash). - --spec build_hash_chain_in_parallel( - rocksdb:db_handle(), - rocksdb:cf_handle(), - H, - H -) -> - [H] when H :: blockchain_block:hash(). -build_hash_chain_in_parallel(DB, CF, Oldest, Youngest) -> Parents = maps:from_list( data_stream:pmap_to_bag( @@ -1490,6 +1479,11 @@ build_hash_chain_in_parallel(DB, CF, Oldest, Youngest) -> end ) ), + StartHash = blockchain_block:hash_block(StartBlock), + trace_lineage(Parents, StopHash, StartHash). + +-spec trace_lineage(#{A => A}, A, A) -> [A, ...]. +trace_lineage(Parents, Oldest, Youngest) -> (fun TraceBack ([Child | _]=Lineage) -> case maps:find(Child, Parents) of error -> Lineage; @@ -3403,4 +3397,32 @@ block_info_upgrade_test() -> V2BlockInfo = upgrade_block_info(V1BlockInfo, Block, Chain), ?assertMatch(ExpV2BlockInfo, V2BlockInfo). +trace_lineage_test_() -> + [ + ?_assertEqual( + [middle, youngest], + trace_lineage( + #{ + youngest => middle, + middle => oldest + }, + oldest, + youngest + ) + ), + ?_assertEqual( + [b, c, d, e], + trace_lineage( + #{ + e => d, + d => c, + c => b, + b => a + }, + a, + e + ) + ) + ]. + -endif. From c1ae2b74a2dcaa62c2081db4a62dabe1ebb3af03 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sat, 14 May 2022 13:11:30 -0400 Subject: [PATCH 5/8] Use more-intuitive field names --- src/data/data_stream.erl | 60 ++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl index fdbdaa66dd..cde0a2a127 100644 --- a/src/data/data_stream.erl +++ b/src/data/data_stream.erl @@ -16,12 +16,12 @@ ]). -record(sched, { - id :: reference(), - ps_up :: [{pid(), reference()}], % producers up. - cs_up :: [{pid(), reference()}], % consumers up. - cs_free :: [pid()], % consumers available to work. - xs :: [any()], % inputs. received from producers. - ys :: [any()] % outputs received from consumers. + id :: reference(), + producers :: [{pid(), reference()}], + consumers :: [{pid(), reference()}], + consumers_free :: [pid()], % available to work. + work :: [any()], % received from producers. + results :: [any()] % received from consumers. }). %% API ======================================================================== @@ -70,14 +70,14 @@ pmap_to_bag(T, F, J) when is_function(T), is_function(F), is_integer(J), J > 0 - fun () -> SchedPid = self(), Consumer = - fun Work () -> + fun Consume () -> ConsumerPid = self(), SchedPid ! {SchedID, consumer_ready, ConsumerPid}, receive {SchedID, job, X} -> Y = F(X), SchedPid ! {SchedID, consumer_output, Y}, - Work(); + Consume(); {SchedID, done} -> ok end @@ -88,12 +88,12 @@ pmap_to_bag(T, F, J) when is_function(T), is_function(F), is_integer(J), J > 0 - end, Ys = sched(#sched{ - id = SchedID, - ps_up = [spawn_monitor(Producer)], - cs_up = [spawn_monitor(Consumer) || _ <- lists:duplicate(J, {})], - cs_free = [], - xs = [], - ys = [] + id = SchedID, + producers = [spawn_monitor(Producer)], + consumers = [spawn_monitor(Consumer) || _ <- lists:duplicate(J, {})], + consumers_free = [], + work = [], + results = [] }), CallerPid ! {SchedID, Ys} end, @@ -115,19 +115,19 @@ pmap_to_bag(T, F, J) when is_function(T), is_function(F), is_integer(J), J > 0 - %% Internal =================================================================== -spec sched(#sched{}) -> [any()]. -sched(#sched{id=_, ps_up=[], cs_up=[], cs_free=[], xs=[], ys=Ys}) -> +sched(#sched{id=_, producers=[], consumers=[], consumers_free=[], work=[], results=Ys}) -> Ys; -sched(#sched{id=ID, ps_up=[], cs_up=[_|_], cs_free=[_|_]=CsFree, xs=[]}=S0) -> +sched(#sched{id=ID, producers=[], consumers=[_|_], consumers_free=[_|_]=CsFree, work=[]}=S0) -> _ = [C ! {ID, done} || C <- CsFree], - sched(S0#sched{cs_free=[]}); -sched(#sched{id=_, ps_up=_, cs_up=[_|_], cs_free=[_|_], xs=[_|_]}=S0) -> + sched(S0#sched{consumers_free=[]}); +sched(#sched{id=_, producers=_, consumers=[_|_], consumers_free=[_|_], work=[_|_]}=S0) -> S1 = sched_assign(S0), sched(S1); -sched(#sched{id=ID, ps_up=Ps, cs_up=_, cs_free=CsFree, xs=Xs, ys=Ys }=S) -> +sched(#sched{id=ID, producers=Ps, consumers=_, consumers_free=CsFree, work=Xs, results=Ys }=S) -> receive - {ID, producer_output, X} -> sched(S#sched{xs=[X | Xs]}); - {ID, consumer_output, Y} -> sched(S#sched{ys=[Y | Ys]}); - {ID, consumer_ready, C} -> sched(S#sched{cs_free=[C | CsFree]}); + {ID, producer_output, X} -> sched(S#sched{work=[X | Xs]}); + {ID, consumer_output, Y} -> sched(S#sched{results=[Y | Ys]}); + {ID, consumer_ready, C} -> sched(S#sched{consumers_free=[C | CsFree]}); {'DOWN', MonRef, process, Pid, normal} -> S1 = sched_remove_worker(S, {Pid, MonRef}), sched(S1); @@ -139,23 +139,23 @@ sched(#sched{id=ID, ps_up=Ps, cs_up=_, cs_free=CsFree, xs=Xs, ys=Ys }=S) -> end. -spec sched_remove_worker(#sched{}, {pid(), reference()}) -> #sched{}. -sched_remove_worker(#sched{ps_up=Ps, cs_up=Cs, cs_free=CsFree}=S, {Pid, _}=PidRef) -> +sched_remove_worker(#sched{producers=Ps, consumers=Cs, consumers_free=CsFree}=S, {Pid, _}=PidRef) -> case lists:member(PidRef, Ps) of true -> - S#sched{ps_up = Ps -- [PidRef]}; + S#sched{producers = Ps -- [PidRef]}; false -> S#sched{ - cs_up = Cs -- [PidRef], - cs_free = CsFree -- [Pid] + consumers = Cs -- [PidRef], + consumers_free = CsFree -- [Pid] } end. -spec sched_assign(#sched{}) -> #sched{}. -sched_assign(#sched{cs_free=[], xs=Xs}=S) -> S#sched{cs_free=[], xs=Xs}; -sched_assign(#sched{cs_free=Cs, xs=[]}=S) -> S#sched{cs_free=Cs, xs=[]}; -sched_assign(#sched{cs_free=[C | Cs], xs=[X | Xs], id=ID}=S) -> +sched_assign(#sched{consumers_free=[], work=Xs}=S) -> S#sched{consumers_free=[], work=Xs}; +sched_assign(#sched{consumers_free=Cs, work=[]}=S) -> S#sched{consumers_free=Cs, work=[]}; +sched_assign(#sched{consumers_free=[C | Cs], work=[X | Xs], id=ID}=S) -> C ! {ID, job, X}, - sched_assign(S#sched{cs_free=Cs, xs=Xs}). + sched_assign(S#sched{consumers_free=Cs, work=Xs}). %% Tests ====================================================================== From 717d146dd9f0f5f37178c2f25b87124920cfdd91 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Mon, 16 May 2022 12:42:24 -0400 Subject: [PATCH 6/8] Note perf drawback and some solution ideas --- src/blockchain.erl | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/blockchain.erl b/src/blockchain.erl index 0596b4fb47..27555e1d76 100644 --- a/src/blockchain.erl +++ b/src/blockchain.erl @@ -1468,6 +1468,24 @@ build(Height, Blockchain, N, Acc) -> ) -> [H, ...] when H :: blockchain_block:hash(). build_hash_chain(StopHash, StartBlock, #blockchain{db=DB}, CF) -> + %% XXX This parallelized build_hash_chain has a potential PERF DRAWBACK: + %% + %% Because it processes the WHOLE of blocks CF BEFORE knowing what will + %% actually be needed, if the caller needs a significantly smaller + %% segment than the whole - we end-up doing a lot of wasted + %% deserializations. + %% + %% Some ideas for solutions: + %% A. eliminate the desrialization step entirtely, by maintaining + %% a rocksdb CF of ChildHash->ParentHash mappings; + %% B. dispatch between serial and parallel versions based on some + %% conditions/configs; + %% C. accept the redundancy, but create an opportunity to terminate + %% early once a long-enough segment is built - reimplement Parents + %% as a cache process which does two things: + %% 1. starts a background job which fills the cache with parents; + %% 2. accepts parent requests and either retrieves responses from + %% cache or compute them (updating the cache). Parents = maps:from_list( data_stream:pmap_to_bag( From ce26e25262fc9598ea72ba7aa1b9b99071df61a6 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Tue, 24 May 2022 15:39:36 -0400 Subject: [PATCH 7/8] Note drawback of current scheduling strategy in pmap_to_bag --- src/data/data_stream.erl | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl index cde0a2a127..b0d2cfab65 100644 --- a/src/data/data_stream.erl +++ b/src/data/data_stream.erl @@ -84,6 +84,20 @@ pmap_to_bag(T, F, J) when is_function(T), is_function(F), is_integer(J), J > 0 - end, Producer = fun () -> + %% XXX Producer is racing against consumers. + %% + %% This hasn't (yet) caused a problem, but in theory it is + %% bad: producer is pouring into the scheduler's queue as + %% fast as possible, potentially faster than consumers can + %% pull from it, so heap usage could explode. + %% + %% Solution ideas: + %% A. have the scheduler call the producer whenever more + %% work is asked for, but ... that can block the + %% scheduler, starving consumers; + %% B. produce in (configurable size) batches, pausing + %% production when batch is full and resuming when not + %% (this is probably the way to go). ok = iter(fun (X) -> SchedPid ! {SchedID, producer_output, X} end, T) end, Ys = From 7f98c55f5401af4edc995e637a7d561c8bf7406c Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Fri, 15 Jul 2022 16:06:33 -0400 Subject: [PATCH 8/8] Check for hash chain disjointness --- src/blockchain.erl | 65 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 63 insertions(+), 2 deletions(-) diff --git a/src/blockchain.erl b/src/blockchain.erl index 27555e1d76..f58e93d870 100644 --- a/src/blockchain.erl +++ b/src/blockchain.erl @@ -1498,14 +1498,45 @@ build_hash_chain(StopHash, StartBlock, #blockchain{db=DB}, CF) -> ) ), StartHash = blockchain_block:hash_block(StartBlock), - trace_lineage(Parents, StopHash, StartHash). + HashChain = trace_lineage(Parents, StopHash, StartHash), + %% TODO Predicate the disjointness check on a config? + case find_orphans(Parents) of + [] -> + ok; + [_|_]=Orphans -> + lager:warning( + "Disjoint blocks database. " + "Hash chain may not be valid. " + "Found orphan blocks: ~p, " + "Hash chain length: ~p.", + [length(HashChain), Orphans] + ) + end, + HashChain. + +-spec find_orphans(#{B => B}) -> [B] when B :: binary(). +find_orphans(ChildToParent) -> + %% Given the pairs: #{A => 0, C => B} + %% and assuming the chain: [0, A, B, C] + %% we can see that B=>A is missing. + %% + %% Since we cannot actually assume, we can find non-genesis-parents which + %% have no parents themselves. In above example that would be B. Implying + %% the chain is disjoint. + [ + Parent + || + {_, Parent} <- maps:to_list(ChildToParent), + maps:find(Parent, ChildToParent) =:= error, % Parent has no parent. + lists:sum(binary_to_list(Parent)) > 0 % Parent is non-genesis. + ]. -spec trace_lineage(#{A => A}, A, A) -> [A, ...]. trace_lineage(Parents, Oldest, Youngest) -> (fun TraceBack ([Child | _]=Lineage) -> case maps:find(Child, Parents) of error -> Lineage; - {ok, Oldest} -> Lineage; + {ok, Oldest} -> Lineage; % Oldest ancestor is excluded. {ok, Parent} -> TraceBack([Parent | Lineage]) end end)([Youngest]). @@ -3443,4 +3474,34 @@ trace_lineage_test_() -> ) ]. +find_orphans_test_() -> + [ + ?_assertEqual( + %% [0, A, B, C] + %% #{A => 0, C => B} + %% missing B=>A, so B is orphan. + [<<"B">>], + find_orphans( + #{ + <<"A">> => <<0>>, + <<"C">> => <<"B">> + } + ) + ), + ?_assertEqual( + %% [X, A, B, C] + %% #{A => X, C => B} + %% missing B=>A, + %% X is not genesis, + %% so both X and B are orphans. + [<<"X">>, <<"B">>], + find_orphans( + #{ + <<"A">> => <<"X">>, + <<"C">> => <<"B">> + } + ) + ) + ]. + -endif.