@@ -1460,26 +1460,94 @@ build(Height, Blockchain, N, Acc) ->
14601460 end
14611461 end .
14621462
1463-
1464- - spec build_hash_chain (blockchain_block :hash (), blockchain_block :block (), blockchain (), rocksdb :cf_handle ()) -> [blockchain_block :hash (), ...].
1465- build_hash_chain (StopHash ,StartingBlock , Blockchain , CF ) ->
1466- BlockHash = blockchain_block :hash_block (StartingBlock ),
1467- ParentHash = blockchain_block :prev_hash (StartingBlock ),
1468- build_hash_chain_ (StopHash , CF , Blockchain , [ParentHash , BlockHash ]).
1469-
1470- - spec build_hash_chain_ (blockchain_block :hash (), rocksdb :cf_handle (), blockchain (), [blockchain_block :hash (), ...]) -> [blockchain_block :hash ()].
1471- build_hash_chain_ (StopHash , CF , Blockchain = # blockchain {db = DB }, [ParentHash |Tail ]= Acc ) ->
1472- case ParentHash == StopHash of
1473- true ->
1474- % % reached the end
1475- Tail ;
1476- false ->
1477- case rocksdb :get (DB , CF , ParentHash , []) of
1478- {ok , BinBlock } ->
1479- build_hash_chain_ (StopHash , CF , Blockchain , [blockchain_block :prev_hash (blockchain_block :deserialize (BinBlock ))|Acc ]);
1480- _ ->
1481- Acc
1463+ - spec build_hash_chain (
1464+ blockchain_block :hash (),
1465+ blockchain_block :block (),
1466+ blockchain (),
1467+ rocksdb :cf_handle ()
1468+ ) ->
1469+ [binary ()].
1470+ build_hash_chain (StopHash , StartBlock , # blockchain {db = DB }, CF ) ->
1471+ StartHash = blockchain_block :hash_block (StartBlock ),
1472+ build_hash_chain_in_parallel (DB , CF , StopHash , StartHash ).
1473+
1474+ - spec build_hash_chain_in_parallel (
1475+ rocksdb :db_handle (),
1476+ rocksdb :cf_handle (),
1477+ H ,
1478+ H
1479+ ) ->
1480+ [H ] when H :: blockchain_block :hash ().
1481+ build_hash_chain_in_parallel (DB , CF , Oldest , Youngest ) ->
1482+ Relations = digraph :new ([cyclic ]), % XXX 'cyclic' is a perf compromise:
1483+ % % 'acyclic' is what we'd ideally want, but it is an expensive option, as
1484+ % % it forces a cycles check ON EACH add_edge operation, so by using
1485+ % % 'cyclic' we're just trusting that our prev_hash relationships are all
1486+ % % correct, which they should be, if not - we have bigger problems.
1487+ Edges =
1488+ data_stream :pmap_to_bag (
1489+ rocksdb_stream (DB , CF ),
1490+ fun ({<<ChildHash /binary >>, <<ChildBlockBin /binary >>}) ->
1491+ ChildBlock = blockchain_block :deserialize (ChildBlockBin ),
1492+ <<ParentHash /binary >> = blockchain_block :prev_hash (ChildBlock ),
1493+ % % XXX Can't update digraph in parallel - no concurrent writes.
1494+ {ParentHash , ChildHash }
14821495 end
1496+ ),
1497+ _ = [digraph :add_vertex (Relations , Child ) || {_ , Child } <- Edges ],
1498+ _ = [digraph :add_edge (Relations , Parent , Child ) || {Parent , Child } <- Edges ],
1499+ HashChain =
1500+ % % XXX The simplest solution:
1501+ % % digraph:get_path(Relations, Oldest, Youngest)
1502+ % % is fine in the ideal case, but doesn't work in the case of a
1503+ % % broken path (when a block was missing and its parent could not
1504+ % % be looked up), in which case we need to return the longest
1505+ % % lineage found, traced back from the youngest hash. TraceBack
1506+ % % works for either scenario.
1507+ (fun TraceBack ([Child | _ ]= Lineage ) ->
1508+ case digraph :in_neighbours (Relations , Child ) of
1509+ [] ->
1510+ Lineage ;
1511+ [Oldest ] ->
1512+ Lineage ;
1513+ [Parent ] ->
1514+ TraceBack ([Parent | Lineage ]);
1515+ [_ |_ ]= Parents ->
1516+ error ({incorrect_chain , multiple_parents , Child , Parents })
1517+ end
1518+ end )([Youngest ]),
1519+ true = digraph :delete (Relations ),
1520+ HashChain .
1521+
1522+ - spec rocksdb_stream (rocksdb :db_handle (), rocksdb :cf_handle ()) ->
1523+ data_stream :t ({K :: binary (), V :: binary ()}).
1524+ rocksdb_stream (DB , CF ) ->
1525+ fun () ->
1526+ case rocksdb :iterator (DB , CF , []) of
1527+ {error , Reason } ->
1528+ error ({rocks_key_streaming_failure , Reason });
1529+ {ok , Iter } ->
1530+ case rocksdb :iterator_move (Iter , first ) of
1531+ {ok , K , V } ->
1532+ {some , {{K , V }, rocksdb_stream (Iter )}};
1533+ Error ->
1534+ error ({rocks_key_streaming_failure , Error })
1535+ end
1536+ end
1537+ end .
1538+
1539+ - spec rocksdb_stream (rocksdb :itr_handle ()) ->
1540+ data_stream :t ({K :: binary (), V :: binary ()}).
1541+ rocksdb_stream (Iter ) ->
1542+ fun () ->
1543+ case rocksdb :iterator_move (Iter , next ) of
1544+ {ok , K , V } ->
1545+ {some , {{K , V }, rocksdb_stream (Iter )}};
1546+ {error , invalid_iterator } ->
1547+ none ;
1548+ {error , Reason } ->
1549+ error ({rocks_key_streaming_failure , Reason })
1550+ end
14831551 end .
14841552
14851553- spec fold_chain (fun ((Blk :: blockchain_block :block (), AccIn :: any ()) -> NewAcc :: any ()),
0 commit comments