Skip to content
This repository was archived by the owner on Dec 5, 2024. It is now read-only.

few optimizations on pruning #1278

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 31 additions & 33 deletions ethereumj-core/src/main/java/org/ethereum/crypto/HashUtil.java
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
import org.spongycastle.crypto.Digest;
import org.spongycastle.crypto.digests.RIPEMD160Digest;
import org.spongycastle.util.encoders.Hex;

import java.math.BigInteger;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@@ -35,13 +36,12 @@

import static java.util.Arrays.copyOfRange;
import static org.ethereum.util.ByteUtil.EMPTY_BYTE_ARRAY;
import static org.ethereum.util.ByteUtil.bigIntegerToBytes;
import static org.ethereum.util.ByteUtil.bytesToBigInteger;

public class HashUtil {

private static final Logger LOG = LoggerFactory.getLogger(HashUtil.class);

private static final String[] HEX = hexDictionnary();
public static final byte[] EMPTY_DATA_HASH;
public static final byte[] EMPTY_LIST_HASH;
public static final byte[] EMPTY_TRIE_HASH;
@@ -63,8 +63,7 @@ public class HashUtil {
}

/**
* @param input
* - data for hashing
* @param input - data for hashing
* @return - sha256 hash of the data
*/
public static byte[] sha256(byte[] input) {
@@ -105,13 +104,10 @@ public static byte[] sha3(byte[] input1, byte[] input2) {

/**
* hashing chunk of the data
*
* @param input
* - data for hash
* @param start
* - start of hashing chunk
* @param length
* - length of hashing chunk
*
* @param input - data for hash
* @param start - start of hashing chunk
* @param length - length of hashing chunk
* @return - keccak hash of the chunk
*/
public static byte[] sha3(byte[] input, int start, int length) {
@@ -139,8 +135,7 @@ public static byte[] sha512(byte[] input) {
}

/**
* @param data
* - message to hash
* @param data - message to hash
* @return - reipmd160 hash of the message
*/
public static byte[] ripemd160(byte[] data) {
@@ -157,9 +152,8 @@ public static byte[] ripemd160(byte[] data) {
/**
* Calculates RIGTMOST160(SHA3(input)). This is used in address
* calculations. *
*
* @param input
* - data
*
* @param input - data
* @return - 20 right bytes of the hash keccak of the data
*/
public static byte[] sha3omit12(byte[] input) {
@@ -170,10 +164,8 @@ public static byte[] sha3omit12(byte[] input) {
/**
* The way to calculate new address inside ethereum
*
* @param addr
* - creating address
* @param nonce
* - nonce of creating address
* @param addr - creating address
* @param nonce - nonce of creating address
* @return new address
*/
public static byte[] calcNewAddr(byte[] addr, byte[] nonce) {
@@ -189,8 +181,8 @@ public static byte[] calcNewAddr(byte[] addr, byte[] nonce) {
* sha3(0xff ++ msg.sender ++ salt ++ sha3(init_code)))[12:]
*
* @param senderAddr - creating address
* @param initCode - contract init code
* @param salt - salt to make different result addresses
* @param initCode - contract init code
* @param salt - salt to make different result addresses
* @return new address
*/
public static byte[] calcSaltAddr(byte[] senderAddr, byte[] initCode, byte[] salt) {
@@ -209,11 +201,9 @@ public static byte[] calcSaltAddr(byte[] senderAddr, byte[] initCode, byte[] sal
}

/**
* @see #doubleDigest(byte[], int, int)
*
* @param input
* -
* @param input -
* @return -
* @see #doubleDigest(byte[], int, int)
*/
public static byte[] doubleDigest(byte[] input) {
return doubleDigest(input, 0, input.length);
@@ -224,12 +214,9 @@ public static byte[] doubleDigest(byte[] input) {
* resulting hash again. This is standard procedure in Bitcoin. The
* resulting hash is in big endian form.
*
* @param input
* -
* @param offset
* -
* @param length
* -
* @param input -
* @param offset -
* @param length -
* @return -
*/
public static byte[] doubleDigest(byte[] input, int offset, int length) {
@@ -272,7 +259,18 @@ public static byte[] randomHash() {
return randomHash;
}


public static String shortHash(byte[] hash) {
return Hex.toHexString(hash).substring(0, 6);
return HEX[hash[0] & 0xFF]
+ HEX[hash[1] & 0xFF]
+ HEX[hash[2] & 0xFF];
}

private static String[] hexDictionnary() {
String[] values = new String[0xff + 0x1];
for (int i = 0; i <= 0xff; i++) {
values[i] = String.format("%02x", i & 0xFF);
}
return values;
}
}
147 changes: 75 additions & 72 deletions ethereumj-core/src/main/java/org/ethereum/db/prune/Pruner.java
Original file line number Diff line number Diff line change
@@ -21,65 +21,67 @@
* This class is responsible for state pruning.
*
* <p>
* Taking the information supplied by {@link #journal} (check {@link JournalSource} for details)
* removes unused nodes from the {@link #storage}.
* There are two types of unused nodes:
* nodes not references in the trie after N blocks from the current one and
* nodes which were inserted in the forks that finally were not accepted
* Taking the information supplied by {@link #journal} (check {@link JournalSource} for details)
* removes unused nodes from the {@link #storage}.
* There are two types of unused nodes:
* nodes not references in the trie after N blocks from the current one and
* nodes which were inserted in the forks that finally were not accepted
*
* <p>
* Each prune session uses a certain chain {@link Segment}
* which is going to be 'pruned'. To be confident that live nodes won't be removed,
* pruner must be initialized with the top of the chain, see {@link #init(List, int)}}.
* And after that it must be fed with each newly processed block, see {@link #feed(JournalSource.Update)}.
* {@link QuotientFilter} ({@link CountingQuotientFilter} implementation in particular) instance is used to
* efficiently keep upcoming inserts in memory and protect newly inserted nodes from being deleted during
* prune session. The filter is constantly recycled in {@link #prune(Segment)} method.
* Each prune session uses a certain chain {@link Segment}
* which is going to be 'pruned'. To be confident that live nodes won't be removed,
* pruner must be initialized with the top of the chain, see {@link #init(List, int)}}.
* And after that it must be fed with each newly processed block, see {@link #feed(JournalSource.Update)}.
* {@link QuotientFilter} ({@link CountingQuotientFilter} implementation in particular) instance is used to
* efficiently keep upcoming inserts in memory and protect newly inserted nodes from being deleted during
* prune session. The filter is constantly recycled in {@link #prune(Segment)} method.
*
* <p>
* When 'prune.maxDepth' param is quite big, it becomes not efficient to keep reverted nodes until prune block number has come.
* Hence Pruner has two step mode to mitigate memory consumption, second step is initiated by {@link #withSecondStep(List, int)}.
* In that mode nodes from not accepted forks are deleted from storage immediately but main chain deletions are
* postponed for the second step.
* Second step uses another one instance of QuotientFilter with less memory impact, check {@link #instantiateFilter(int, int)}.
* When 'prune.maxDepth' param is quite big, it becomes not efficient to keep reverted nodes until prune block number has come.
* Hence Pruner has two step mode to mitigate memory consumption, second step is initiated by {@link #withSecondStep(List, int)}.
* In that mode nodes from not accepted forks are deleted from storage immediately but main chain deletions are
* postponed for the second step.
* Second step uses another one instance of QuotientFilter with less memory impact, check {@link #instantiateFilter(int, int)}.
*
* <p>
* Basically, prune session initiated by {@link #prune(Segment)} method
* consists of 3 steps: first, it reverts forks, then it persists main chain,
* after that it recycles {@link #journal} by removing processed updates from it.
* During the session reverted and deleted nodes are propagated to the {@link #storage} immediately.
* Basically, prune session initiated by {@link #prune(Segment)} method
* consists of 3 steps: first, it reverts forks, then it persists main chain,
* after that it recycles {@link #journal} by removing processed updates from it.
* During the session reverted and deleted nodes are propagated to the {@link #storage} immediately.
*
* @author Mikhail Kalinin
* @since 25.01.2018
*/
public class Pruner {
public final class Pruner {

private static final Logger logger = LoggerFactory.getLogger("prune");

Source<byte[], JournalSource.Update> journal;
Source<byte[], ?> storage;
QuotientFilter filter;
QuotientFilter distantFilter;
boolean ready = false;
private final Source<byte[], JournalSource.Update> journal;
private final Source<byte[], ?> storage;
private QuotientFilter filter;
private QuotientFilter distantFilter;
private boolean ready = false;

private static class Stats {
int collisions = 0;
int deleted = 0;
double load = 0;

@Override
public String toString() {
return String.format("load %.4f, collisions %d, deleted %d", load, collisions, deleted);
}
}
Stats maxLoad = new Stats();
Stats maxCollisions = new Stats();
int maxKeysInMemory = 0;
int statsTracker = 0;

Stats distantMaxLoad = new Stats();
Stats distantMaxCollisions = new Stats();
private final Stats maxLoad = new Stats();
private final Stats maxCollisions = new Stats();
private int maxKeysInMemory = 0;
private int statsTracker = 0;

private final Stats distantMaxLoad = new Stats();
private final Stats distantMaxCollisions = new Stats();

public Pruner(Source<byte[], JournalSource.Update> journal, Source<byte[], ?> storage) {
public Pruner(final Source<byte[], JournalSource.Update> journal, final Source<byte[], ?> storage) {
this.storage = storage;
this.journal = journal;
}
@@ -88,17 +90,17 @@ public boolean isReady() {
return ready;
}

public boolean init(List<byte[]> forkWindow, int sizeInBlocks) {
public boolean init(final List<byte[]> forkWindow, final int sizeInBlocks) {
if (ready) return true;

if (!forkWindow.isEmpty() && journal.get(forkWindow.get(0)) == null) {
logger.debug("pruner init aborted: can't fetch update " + toHexString(forkWindow.get(0)));
return false;
}

QuotientFilter filter = instantiateFilter(sizeInBlocks, FILTER_ENTRIES_FORK);
final QuotientFilter filter = instantiateFilter(sizeInBlocks, FILTER_ENTRIES_FORK);
for (byte[] hash : forkWindow) {
JournalSource.Update update = journal.get(hash);
final JournalSource.Update update = journal.get(hash);
if (update == null) {
logger.debug("pruner init aborted: can't fetch update " + toHexString(hash));
return false;
@@ -110,20 +112,20 @@ public boolean init(List<byte[]> forkWindow, int sizeInBlocks) {
return ready = true;
}

public boolean withSecondStep() {
private boolean withSecondStep() {
return distantFilter != null;
}

public void withSecondStep(List<byte[]> mainChainWindow, int sizeInBlocks) {
public void withSecondStep(final List<byte[]> mainChainWindow, final int sizeInBlocks) {
if (!ready) return;

QuotientFilter filter = instantiateFilter(sizeInBlocks, FILTER_ENTRIES_DISTANT);
final QuotientFilter filter = instantiateFilter(sizeInBlocks, FILTER_ENTRIES_DISTANT);

if (!mainChainWindow.isEmpty()) {
int i = mainChainWindow.size() - 1;
for (; i >= 0; i--) {
byte[] hash = mainChainWindow.get(i);
JournalSource.Update update = journal.get(hash);
final JournalSource.Update update = journal.get(hash);
if (update == null) {
break;
}
@@ -141,36 +143,35 @@ public void withSecondStep(List<byte[]> mainChainWindow, int sizeInBlocks) {
private static final int FILTER_ENTRIES_FORK = 1 << 13; // approximate number of nodes per block
private static final int FILTER_ENTRIES_DISTANT = 1 << 11;
private static final int FILTER_MAX_SIZE = Integer.MAX_VALUE >> 1; // that filter will consume ~3g of mem
private QuotientFilter instantiateFilter(int blocksCnt, int entries) {

private QuotientFilter instantiateFilter(final int blocksCnt, int entries) {
int size = Math.min(entries * blocksCnt, FILTER_MAX_SIZE);
return CountingQuotientFilter.create(size, size);
}

public boolean init(byte[] ... upcoming) {
public boolean init(byte[]... upcoming) {
return init(Arrays.asList(upcoming), 192);
}

public void feed(JournalSource.Update update) {
public void feed(final JournalSource.Update update) {
if (ready)
update.getInsertedKeys().forEach(filter::insert);
}

public void prune(Segment segment) {
public void prune(final Segment segment) {
if (!ready) return;
assert segment.isComplete();

logger.trace("prune " + segment);

long t = System.currentTimeMillis();
Pruning pruning = new Pruning();
final Pruning pruning = new Pruning();
// important for fork management, check Pruning#insertedInMainChain and Pruning#insertedInForks for details
segment.forks.sort((f1, f2) -> Long.compare(f1.startNumber(), f2.startNumber()));
segment.forks.forEach(pruning::revert);

// delete updates
for (Chain chain : segment.forks) {
chain.getHashes().forEach(journal::delete);
}
segment.forks.forEach(chain -> chain.getHashes().forEach(journal::delete));

int nodesPostponed = 0;
if (withSecondStep()) {
@@ -180,13 +181,14 @@ public void prune(Segment segment) {
segment.main.getHashes().forEach(journal::delete);
}

if (logger.isTraceEnabled()) logger.trace("nodes {}, keys in mem: {}, filter load: {}/{}: {}, distinct collisions: {}",
(withSecondStep() ? "postponed: " + nodesPostponed : "deleted: " + pruning.nodesDeleted),
pruning.insertedInForks.size() + pruning.insertedInMainChain.size(),
((CountingQuotientFilter) filter).getEntryNumber(), ((CountingQuotientFilter) filter).getMaxInsertions(),
String.format("%.4f", (double) ((CountingQuotientFilter) filter).getEntryNumber() /
((CountingQuotientFilter) filter).getMaxInsertions()),
((CountingQuotientFilter) filter).getCollisionNumber());
if (logger.isTraceEnabled())
logger.trace("nodes {}, keys in mem: {}, filter load: {}/{}: {}, distinct collisions: {}",
(withSecondStep() ? "postponed: " + nodesPostponed : "deleted: " + pruning.nodesDeleted),
pruning.insertedInForks.size() + pruning.insertedInMainChain.size(),
((CountingQuotientFilter) filter).getEntryNumber(), ((CountingQuotientFilter) filter).getMaxInsertions(),
String.format("%.4f", (double) ((CountingQuotientFilter) filter).getEntryNumber() /
((CountingQuotientFilter) filter).getMaxInsertions()),
((CountingQuotientFilter) filter).getCollisionNumber());

if (logger.isDebugEnabled()) {
int collisions = ((CountingQuotientFilter) filter).getCollisionNumber();
@@ -214,7 +216,7 @@ public void prune(Segment segment) {
logger.trace(segment + " pruned in {}ms", System.currentTimeMillis() - t);
}

public void persist(byte[] hash) {
public void persist(final byte[] hash) {
if (!ready || !withSecondStep()) return;

logger.trace("persist [{}]", toHexString(hash));
@@ -259,17 +261,18 @@ public void persist(byte[] hash) {
}
}

if (logger.isTraceEnabled()) logger.trace("[{}] persisted in {}ms: {}/{} ({}%) nodes deleted, filter load: {}/{}: {}, distinct collisions: {}",
HashUtil.shortHash(hash), System.currentTimeMillis() - t, nodesDeleted, update.getDeletedKeys().size(),
nodesDeleted * 100 / update.getDeletedKeys().size(),
((CountingQuotientFilter) distantFilter).getEntryNumber(),
((CountingQuotientFilter) distantFilter).getMaxInsertions(),
String.format("%.4f", (double) ((CountingQuotientFilter) distantFilter).getEntryNumber() /
((CountingQuotientFilter) distantFilter).getMaxInsertions()),
((CountingQuotientFilter) distantFilter).getCollisionNumber());
if (logger.isTraceEnabled())
logger.trace("[{}] persisted in {}ms: {}/{} ({}%) nodes deleted, filter load: {}/{}: {}, distinct collisions: {}",
HashUtil.shortHash(hash), System.currentTimeMillis() - t, nodesDeleted, update.getDeletedKeys().size(),
nodesDeleted * 100 / update.getDeletedKeys().size(),
((CountingQuotientFilter) distantFilter).getEntryNumber(),
((CountingQuotientFilter) distantFilter).getMaxInsertions(),
String.format("%.4f", (double) ((CountingQuotientFilter) distantFilter).getEntryNumber() /
((CountingQuotientFilter) distantFilter).getMaxInsertions()),
((CountingQuotientFilter) distantFilter).getCollisionNumber());
}

private int postpone(Chain chain) {
private int postpone(final Chain chain) {
if (logger.isTraceEnabled())
logger.trace("<~ postponing " + chain + ": " + strSample(chain.getHashes()));

@@ -291,7 +294,7 @@ private int postpone(Chain chain) {
return nodesPostponed;
}

private int persist(Chain chain) {
private int persist(final Chain chain) {
if (logger.isTraceEnabled())
logger.trace("<~ persisting " + chain + ": " + strSample(chain.getHashes()));

@@ -316,7 +319,7 @@ private int persist(Chain chain) {
return nodesDeleted;
}

private String strSample(Collection<byte[]> hashes) {
private String strSample(final Collection<byte[]> hashes) {
String sample = hashes.stream().limit(3)
.map(HashUtil::shortHash).collect(Collectors.joining(", "));
if (hashes.size() > 3) {
@@ -325,15 +328,15 @@ private String strSample(Collection<byte[]> hashes) {
return sample;
}

private class Pruning {
private final class Pruning {

// track nodes inserted and deleted in forks
// to avoid deletion of those nodes which were originally inserted in the main chain
Set<byte[]> insertedInMainChain = new ByteArraySet();
Set<byte[]> insertedInForks = new ByteArraySet();
private final Set<byte[]> insertedInMainChain = new ByteArraySet();
private final Set<byte[]> insertedInForks = new ByteArraySet();
int nodesDeleted = 0;

private void revert(Chain chain) {
private void revert(final Chain chain) {
if (logger.isTraceEnabled())
logger.trace("<~ reverting " + chain + ": " + strSample(chain.getHashes()));