Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZOOKEEPER-4801: Add payload size limitation for committed log #2130

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,26 @@ public boolean isThrottlable() {
&& this.type != OpCode.createSession;
}

private long approximateSize = 0;

public byte[] getSerializeData() {
if (this.hdr == null) {
return null;
}
try {
return Util.marshallTxnEntry(this.hdr, this.txn, this.txnDigest);
byte[] bytes = Util.marshallTxnEntry(this.hdr, this.txn, this.txnDigest);
approximateSize = bytes.length;
return bytes;
} catch (IOException e) {
LOG.error("This really should be impossible.", e);
return new byte[32];
}
}

public long getApproximateSize() {
return approximateSize;
}

/**
* If this is a create or close request for a local-only session.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
Expand Down Expand Up @@ -92,9 +93,16 @@ public class ZKDatabase {
public static final String COMMIT_LOG_COUNT = "zookeeper.commitLogCount";
public static final int DEFAULT_COMMIT_LOG_COUNT = 500;
public int commitLogCount;
public static final String COMMIT_LOG_SIZE = "zookeeper.commitLogSize";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document this in zookeeperAdmin.md.

public static final double DEFAULT_COMMIT_LOG_SIZE = Runtime.getRuntime().maxMemory() * 0.2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it evaluated as bytes ? Should it better to be long ?

public double commitLogSize;
protected Queue<Proposal> committedLog = new ArrayDeque<>();
protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
private volatile boolean initialized = false;
/**
* committedLog bytes size.
*/
private AtomicLong totalCommittLogSize = new AtomicLong(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: Should be totalCommitLogSize ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And long is enough ?


/**
* Number of txn since last snapshot;
Expand Down Expand Up @@ -137,12 +145,11 @@ public ZKDatabase(FileTxnSnapLog snapLog) {
commitLogCount = Integer.parseInt(
System.getProperty(COMMIT_LOG_COUNT,
Integer.toString(DEFAULT_COMMIT_LOG_COUNT)));
if (commitLogCount < DEFAULT_COMMIT_LOG_COUNT) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value is 500 which is the recommended minimum. (https://zookeeper.apache.org/doc/r3.9.2/zookeeperAdmin.html)

I do think 500 is pretty small for ZooKeeper already. It is meaningless to support 1 as minimum. Is it better to rename DEFAULT_COMMIT_LOG_COUNT to MIN_COMMIT_LOG_COUNT ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the default then?

if (commitLogCount <= 0) {
commitLogCount = DEFAULT_COMMIT_LOG_COUNT;
LOG.warn(
"The configured commitLogCount {} is less than the recommended {}, going to use the recommended one",
COMMIT_LOG_COUNT,
DEFAULT_COMMIT_LOG_COUNT);
"The configured commitLogCount {} is less than zero, going to use the recommended value {}",
COMMIT_LOG_COUNT, DEFAULT_COMMIT_LOG_COUNT);
}
} catch (NumberFormatException e) {
LOG.error(
Expand All @@ -151,7 +158,24 @@ public ZKDatabase(FileTxnSnapLog snapLog) {
DEFAULT_COMMIT_LOG_COUNT);
commitLogCount = DEFAULT_COMMIT_LOG_COUNT;
}
LOG.info("{}={}", COMMIT_LOG_COUNT, commitLogCount);
try {
commitLogSize = Double.parseDouble(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backlog: I think we cloud introduce something similar to Time::parseTimeInterval in a later time. The size may look large in case of bytes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me and I think it's doable in this PR too since we're already touching this part. The parser is already merged and could be easily shared.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will Double.parseDouble() accept floating point numbers from the config?
Shouldn't we prevent that?

System.getProperty(COMMIT_LOG_SIZE,
Double.toString(DEFAULT_COMMIT_LOG_SIZE)));
if (commitLogSize <= 0) {
commitLogSize = DEFAULT_COMMIT_LOG_SIZE;
LOG.warn(
"The configured commitLogSize {} is less than the zero, going to use the recommended value {}",
COMMIT_LOG_SIZE, DEFAULT_COMMIT_LOG_SIZE);
}
} catch (NumberFormatException e) {
LOG.error(
"Error parsing {} - use default value {}",
COMMIT_LOG_SIZE,
DEFAULT_COMMIT_LOG_SIZE);
commitLogSize = DEFAULT_COMMIT_LOG_SIZE;
}
LOG.info("{}={}, {}={}", COMMIT_LOG_COUNT, commitLogCount, COMMIT_LOG_SIZE, commitLogSize);
}

/**
Expand Down Expand Up @@ -182,6 +206,7 @@ public void clear() {
try {
lock.lock();
committedLog.clear();
totalCommittLogSize.set(0);
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -320,17 +345,24 @@ public void addCommittedProposal(Request request) {
WriteLock wl = logLock.writeLock();
try {
wl.lock();
if (committedLog.size() > commitLogCount) {
committedLog.remove();
minCommittedLog = committedLog.peek().getZxid();
}
if (committedLog.isEmpty()) {
minCommittedLog = request.zxid;
maxCommittedLog = request.zxid;
}
PureRequestProposal p = new PureRequestProposal(request);
committedLog.add(p);
maxCommittedLog = p.getZxid();
totalCommittLogSize.addAndGet(request.getApproximateSize());
while (committedLog.size() > 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!committedLog.isEmpty() ?

&& (committedLog.size() > commitLogCount || totalCommittLogSize.get() > commitLogSize)) {
committedLog.remove();
Proposal peek = committedLog.peek();
if (peek == null) {
minCommittedLog = 0;
maxCommittedLog = 0;
} else {
minCommittedLog = p.getZxid();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be peek.getZxid() ?

}
}
} finally {
wl.unlock();
}
Expand Down
Loading