diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java index 27fa4e2dff7..8a8bb833a68 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java @@ -168,18 +168,26 @@ public boolean isThrottlable() { && this.type != OpCode.createSession; } + private long approximateSize = 0; + public byte[] getSerializeData() { if (this.hdr == null) { return null; } try { - return Util.marshallTxnEntry(this.hdr, this.txn, this.txnDigest); + byte[] bytes = Util.marshallTxnEntry(this.hdr, this.txn, this.txnDigest); + approximateSize = bytes.length; + return bytes; } catch (IOException e) { LOG.error("This really should be impossible.", e); return new byte[32]; } } + public long getApproximateSize() { + return approximateSize; + } + /** * If this is a create or close request for a local-only session. */ diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java index d98c97f2c07..1b61620e8bd 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java @@ -33,6 +33,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -92,9 +93,16 @@ public class ZKDatabase { public static final String COMMIT_LOG_COUNT = "zookeeper.commitLogCount"; public static final int DEFAULT_COMMIT_LOG_COUNT = 500; public int commitLogCount; + public static final String COMMIT_LOG_SIZE = "zookeeper.commitLogSize"; + public static final double DEFAULT_COMMIT_LOG_SIZE = Runtime.getRuntime().maxMemory() * 0.2; + public double commitLogSize; protected Queue committedLog = new ArrayDeque<>(); protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock(); private volatile boolean initialized = false; + /** + * committedLog bytes size. + */ + private AtomicLong totalCommittLogSize = new AtomicLong(0); /** * Number of txn since last snapshot; @@ -137,12 +145,11 @@ public ZKDatabase(FileTxnSnapLog snapLog) { commitLogCount = Integer.parseInt( System.getProperty(COMMIT_LOG_COUNT, Integer.toString(DEFAULT_COMMIT_LOG_COUNT))); - if (commitLogCount < DEFAULT_COMMIT_LOG_COUNT) { + if (commitLogCount <= 0) { commitLogCount = DEFAULT_COMMIT_LOG_COUNT; LOG.warn( - "The configured commitLogCount {} is less than the recommended {}, going to use the recommended one", - COMMIT_LOG_COUNT, - DEFAULT_COMMIT_LOG_COUNT); + "The configured commitLogCount {} is less than zero, going to use the recommended value {}", + COMMIT_LOG_COUNT, DEFAULT_COMMIT_LOG_COUNT); } } catch (NumberFormatException e) { LOG.error( @@ -151,7 +158,24 @@ public ZKDatabase(FileTxnSnapLog snapLog) { DEFAULT_COMMIT_LOG_COUNT); commitLogCount = DEFAULT_COMMIT_LOG_COUNT; } - LOG.info("{}={}", COMMIT_LOG_COUNT, commitLogCount); + try { + commitLogSize = Double.parseDouble( + System.getProperty(COMMIT_LOG_SIZE, + Double.toString(DEFAULT_COMMIT_LOG_SIZE))); + if (commitLogSize <= 0) { + commitLogSize = DEFAULT_COMMIT_LOG_SIZE; + LOG.warn( + "The configured commitLogSize {} is less than the zero, going to use the recommended value {}", + COMMIT_LOG_SIZE, DEFAULT_COMMIT_LOG_SIZE); + } + } catch (NumberFormatException e) { + LOG.error( + "Error parsing {} - use default value {}", + COMMIT_LOG_SIZE, + DEFAULT_COMMIT_LOG_SIZE); + commitLogSize = DEFAULT_COMMIT_LOG_SIZE; + } + LOG.info("{}={}, {}={}", COMMIT_LOG_COUNT, commitLogCount, COMMIT_LOG_SIZE, commitLogSize); } /** @@ -182,6 +206,7 @@ public void clear() { try { lock.lock(); committedLog.clear(); + totalCommittLogSize.set(0); } finally { lock.unlock(); } @@ -320,17 +345,24 @@ public void addCommittedProposal(Request request) { WriteLock wl = logLock.writeLock(); try { wl.lock(); - if (committedLog.size() > commitLogCount) { - committedLog.remove(); - minCommittedLog = committedLog.peek().getZxid(); - } if (committedLog.isEmpty()) { minCommittedLog = request.zxid; - maxCommittedLog = request.zxid; } PureRequestProposal p = new PureRequestProposal(request); committedLog.add(p); maxCommittedLog = p.getZxid(); + totalCommittLogSize.addAndGet(request.getApproximateSize()); + while (committedLog.size() > 0 + && (committedLog.size() > commitLogCount || totalCommittLogSize.get() > commitLogSize)) { + committedLog.remove(); + Proposal peek = committedLog.peek(); + if (peek == null) { + minCommittedLog = 0; + maxCommittedLog = 0; + } else { + minCommittedLog = p.getZxid(); + } + } } finally { wl.unlock(); }