From f5113dad530bdef1dc604c0ac537783750c5deb7 Mon Sep 17 00:00:00 2001 From: Damien Diederen Date: Thu, 17 Jun 2021 15:55:18 +0200 Subject: [PATCH 01/11] ZOOKEEPER-4306: BinaryOutputArchive: Add note about UTF-8-like encoding --- .../java/org/apache/jute/BinaryOutputArchive.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java b/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java index 52db1bc3c45..d5edecb90d4 100644 --- a/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java +++ b/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java @@ -78,11 +78,16 @@ public void writeDouble(double d, String tag) throws IOException { } /** - * create our own char encoder to utf8. This is faster - * then string.getbytes(UTF8). + * Encodes the characters of {@code s} into an UTF-8-like byte + * sequence. This method reuses a local {@code ByteBuffer} and + * should seldom allocate. * - * @param s the string to encode into utf8 - * @return utf8 byte sequence. + *

Note that this is not a full-blown UTF-8 implementation; + * notably, it does not decode UTF-16 surrogate pairs, and rather + * encodes each {@code char} individually. + * + * @param s the string to encode + * @return the resulting byte sequence */ private ByteBuffer stringToByteBuffer(CharSequence s) { bb.clear(); From 470da35e490949c445af18e956d7c58b9388923c Mon Sep 17 00:00:00 2001 From: Damien Diederen Date: Thu, 17 Jun 2021 11:19:59 +0200 Subject: [PATCH 02/11] ZOOKEEPER-4306: BinaryOutputArchive: Add serialized String measurement API --- .../org/apache/jute/BinaryOutputArchive.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java b/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java index d5edecb90d4..e46013f337c 100644 --- a/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java +++ b/zookeeper-jute/src/main/java/org/apache/jute/BinaryOutputArchive.java @@ -115,6 +115,35 @@ private ByteBuffer stringToByteBuffer(CharSequence s) { return bb; } + /** + * Computes the exact payload size of a character sequence as + * encoded by the {@link #stringToByteBuffer(CharSequence) + * stringToByteBuffer} method, without any "length descriptor". + * + *

Note that the algorithm used by {@code stringToByteBuffer} + * does not match {@code StandardCharsets.UTF_8}; this method + * "emulates" the former. + * + * @param s the string to encode + * @return the serialized payload size in bytes + * @throws ArithmeticException if the result overflows an int + */ + private static int serializedStringPayloadSize(CharSequence s) { + int size = 0; + final int len = s.length(); + for (int i = 0; i < len; i++) { + char c = s.charAt(i); + if (c < 0x80) { + size = Math.addExact(size, 1); + } else if (c < 0x800) { + size = Math.addExact(size, 2); + } else { + size = Math.addExact(size, 3); + } + } + return size; + } + public void writeString(String s, String tag) throws IOException { if (s == null) { writeInt(-1, "len"); @@ -127,6 +156,22 @@ public void writeString(String s, String tag) throws IOException { dataSize += strLen; } + /** + * Computes the exact serialization size of a string. + * + * @param s the string to encode, potentially {@code null} + * @return the serialization size in bytes + * @throws ArithmeticException if the result overflows an int + * + * @see #serializedStringPayloadSize(CharSequence) + */ + public static int serializedStringSize(String s) { + int payloadSize = s == null ? 0 : serializedStringPayloadSize(s); + + // length descriptor + payload. + return Math.addExact(4, payloadSize); + } + public void writeBuffer(byte[] barr, String tag) throws IOException { if (barr == null) { From 289293475f19434369388aeb6bacc0b589291b19 Mon Sep 17 00:00:00 2001 From: Damien Diederen Date: Thu, 17 Jun 2021 15:40:34 +0200 Subject: [PATCH 03/11] ZOOKEEPER-4306: Test serialized String measurement --- .../server/util/SerializeUtilsTest.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/SerializeUtilsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/SerializeUtilsTest.java index 4e2421d13de..54b113d05bd 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/SerializeUtilsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/SerializeUtilsTest.java @@ -19,6 +19,7 @@ package org.apache.zookeeper.server.util; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; @@ -30,6 +31,9 @@ import static org.mockito.Mockito.verify; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.OutputArchive; import org.apache.jute.Record; @@ -118,4 +122,31 @@ public Object answer(InvocationOnMock invocation) throws Throwable { assertArrayEquals(baos.toByteArray(), data); } + private void testSerializeStringSizeWith(String s, int expectedLength) throws IOException { + assertEquals(expectedLength, BinaryOutputArchive.serializedStringSize(s)); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); + boa.writeString(s, "test"); + baos.close(); + + assertEquals(expectedLength, baos.size()); + } + + @Test + public void testSerializeStringSize() throws IOException { + testSerializeStringSizeWith("", 4); + testSerializeStringSizeWith("test", 8); + + byte[] bytes = new byte[BinaryInputArchive.maxBuffer - 4]; + Arrays.fill(bytes, (byte) 'x'); + testSerializeStringSizeWith(new String(bytes, StandardCharsets.US_ASCII), BinaryInputArchive.maxBuffer); + + testSerializeStringSizeWith("-\u00f4-", 8); + testSerializeStringSizeWith("-\u0939-", 9); + + // Note: 12, not 10, because BinaryOutputArchive's + // stringToByteBuffer encodes each 'char' individually. + testSerializeStringSizeWith("-\ud800\udf48-", 12); + } } From c4904ceb16fb879084adae643494fdf2490a5b1e Mon Sep 17 00:00:00 2001 From: Damien Diederen Date: Thu, 17 Jun 2021 11:35:02 +0200 Subject: [PATCH 04/11] ZOOKEEPER-4306: PathUtils: Add serialized path measurement API --- .../org/apache/zookeeper/common/PathUtils.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/PathUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/PathUtils.java index 7ddc5576db6..c8a1ced60d3 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/common/PathUtils.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/PathUtils.java @@ -18,6 +18,8 @@ package org.apache.zookeeper.common; +import org.apache.jute.BinaryOutputArchive; + /** * Path related utilities */ @@ -123,4 +125,16 @@ public static String getTopNamespace(final String path) { final String[] parts = path.split("/"); return parts.length > 1 ? parts[1] : null; } + + /** + * Computes the byte size of a path {@code path} as serialized + * into a transaction. + * + * @param path the path + * @return the size in bytes + * @throws ArithmeticException if the result overflows an int + */ + public static int serializedSize(String path) { + return BinaryOutputArchive.serializedStringSize(path); + } } From 6339fc21db4424061a65e0e8c162b7c591eccb25 Mon Sep 17 00:00:00 2001 From: Damien Diederen Date: Thu, 10 Jun 2021 15:27:42 +0200 Subject: [PATCH 05/11] ZOOKEEPER-4306: DataTree: Wrap per-session ephemeral path sets This commit does not introduce anything new; it simply refactors the ephemeral path management logic in preparation for storing additional bookkeeping information. --- .../org/apache/zookeeper/server/DataTree.java | 79 ++++++++++++------- 1 file changed, 50 insertions(+), 29 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index 012a10e5469..460a6028f1d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -151,7 +151,7 @@ public class DataTree { /** * This hashtable lists the paths of the ephemeral nodes of a session. */ - private final Map> ephemerals = new ConcurrentHashMap<>(); + private final Map ephemerals = new ConcurrentHashMap<>(); /** * This set contains the paths of all container nodes @@ -189,15 +189,12 @@ public class DataTree { private final DigestCalculator digestCalculator; - @SuppressWarnings("unchecked") public Set getEphemerals(long sessionId) { - HashSet ret = ephemerals.get(sessionId); - if (ret == null) { + OwnedEphemerals ownedEphemerals = ephemerals.get(sessionId); + if (ownedEphemerals == null) { return new HashSet<>(); } - synchronized (ret) { - return (HashSet) ret.clone(); - } + return ownedEphemerals.clonePaths(); } public Set getContainers() { @@ -226,8 +223,8 @@ public int getWatchCount() { public int getEphemeralsCount() { int result = 0; - for (HashSet set : ephemerals.values()) { - result += set.size(); + for (OwnedEphemerals ownedEphemerals : ephemerals.values()) { + result += ownedEphemerals.count(); } return result; } @@ -492,10 +489,8 @@ public void createNode(final String path, byte[] data, List acl, long ephem } else if (ephemeralType == EphemeralType.TTL) { ttls.add(path); } else if (ephemeralOwner != 0) { - HashSet list = ephemerals.computeIfAbsent(ephemeralOwner, k -> new HashSet<>()); - synchronized (list) { - list.add(path); - } + OwnedEphemerals ownedEphemerals = ephemerals.computeIfAbsent(ephemeralOwner, k -> new OwnedEphemerals()); + ownedEphemerals.add(path); } if (outputStat != null) { child.copyStat(outputStat); @@ -584,11 +579,9 @@ public void deleteNode(String path, long zxid) throws NoNodeException { } else if (ephemeralType == EphemeralType.TTL) { ttls.remove(path); } else if (owner != 0) { - Set nodes = ephemerals.get(owner); - if (nodes != null) { - synchronized (nodes) { - nodes.remove(path); - } + OwnedEphemerals ownedEphemerals = ephemerals.get(owner); + if (ownedEphemerals != null) { + ownedEphemerals.remove(path); } } } @@ -951,8 +944,9 @@ public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTx case OpCode.closeSession: long sessionId = header.getClientId(); if (txn != null) { + OwnedEphemerals ownedEphemerals = ephemerals.remove(sessionId); killSession(sessionId, header.getZxid(), - ephemerals.remove(sessionId), + ownedEphemerals != null ? ownedEphemerals.clonePaths() : null, ((CloseSessionTxn) txn).getPaths2Delete()); } else { killSession(sessionId, header.getZxid()); @@ -1127,7 +1121,10 @@ void killSession(long session, long zxid) { // so there is no need for synchronization. The list is not // changed here. Only create and delete change the list which // are again called from FinalRequestProcessor in sequence. - killSession(session, zxid, ephemerals.remove(session), null); + OwnedEphemerals ownedEphemerals = ephemerals.remove(session); + killSession(session, zxid, + ownedEphemerals != null ? ownedEphemerals.clonePaths() : null, + null); } void killSession(long session, long zxid, Set paths2DeleteLocal, @@ -1375,8 +1372,8 @@ public void deserialize(InputArchive ia, String tag) throws IOException { } else if (ephemeralType == EphemeralType.TTL) { ttls.add(path); } else if (owner != 0) { - HashSet list = ephemerals.computeIfAbsent(owner, k -> new HashSet<>()); - list.add(path); + OwnedEphemerals ownedEphemerals = ephemerals.computeIfAbsent(owner, k -> new OwnedEphemerals()); + ownedEphemerals.add(path); } } path = ia.readString("path"); @@ -1448,13 +1445,13 @@ public synchronized WatchesSummary getWatchesSummary() { */ public void dumpEphemerals(PrintWriter writer) { writer.println("Sessions with Ephemerals (" + ephemerals.keySet().size() + "):"); - for (Entry> entry : ephemerals.entrySet()) { + for (Entry entry : ephemerals.entrySet()) { writer.print("0x" + Long.toHexString(entry.getKey())); writer.println(":"); - Set tmp = entry.getValue(); + OwnedEphemerals tmp = entry.getValue(); if (tmp != null) { synchronized (tmp) { - for (String path : tmp) { + for (String path : tmp.clonePaths()) { writer.println("\t" + path); } } @@ -1474,10 +1471,8 @@ public void shutdownWatcher() { */ public Map> getEphemerals() { Map> ephemeralsCopy = new HashMap<>(); - for (Entry> e : ephemerals.entrySet()) { - synchronized (e.getValue()) { - ephemeralsCopy.put(e.getKey(), new HashSet<>(e.getValue())); - } + for (Entry e : ephemerals.entrySet()) { + ephemeralsCopy.put(e.getKey(), e.getValue().clonePaths()); } return ephemeralsCopy; } @@ -1950,6 +1945,32 @@ public long getDigest() { } + /** + * Holds information about the ephemeral paths associated with a + * session. Currently just a simple wrapper around {@code + * HashSet}. + */ + private static class OwnedEphemerals { + private HashSet paths = new HashSet<>(); + + @SuppressWarnings("unchecked") + public synchronized Set clonePaths() { + return (Set) paths.clone(); + } + + public synchronized int count() { + return paths.size(); + } + + public synchronized boolean add(String path) { + return paths.add(path); + } + + public synchronized boolean remove(String path) { + return paths.remove(path); + } + }; + /** * Create a node stat from the given params. * From 98fdc0e1a8a4669a61da3fa14a6108e4cc7314bb Mon Sep 17 00:00:00 2001 From: Damien Diederen Date: Thu, 10 Jun 2021 16:15:44 +0200 Subject: [PATCH 06/11] ZOOKEEPER-4306: DataTree: Track serialized size of ephemeral paths --- .../org/apache/zookeeper/server/DataTree.java | 43 +++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index 460a6028f1d..b78ca39ebc3 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -197,6 +197,14 @@ public Set getEphemerals(long sessionId) { return ownedEphemerals.clonePaths(); } + public int getEphemeralsSerializedSize(long sessionId) { + OwnedEphemerals ownedEphemerals = ephemerals.get(sessionId); + if (ownedEphemerals == null) { + return OwnedEphemerals.MIN_SERIALIZED_SIZE; + } + return ownedEphemerals.getSerializedSize(); + } + public Set getContainers() { return new HashSet<>(containers); } @@ -1951,8 +1959,13 @@ public long getDigest() { * HashSet}. */ private static class OwnedEphemerals { + // Serialization starts with a vector length descriptor. + public static final int MIN_SERIALIZED_SIZE = 4; + private HashSet paths = new HashSet<>(); + private int serializedSize = MIN_SERIALIZED_SIZE; + @SuppressWarnings("unchecked") public synchronized Set clonePaths() { return (Set) paths.clone(); @@ -1962,12 +1975,34 @@ public synchronized int count() { return paths.size(); } - public synchronized boolean add(String path) { - return paths.add(path); + public boolean add(String path) { + int pathSerSize = PathUtils.serializedSize(path); + + synchronized (this) { + int newSerSize = Math.addExact(serializedSize, pathSerSize); + boolean result = paths.add(path); + if (result) { + serializedSize = newSerSize; + } + return result; + } + } + + public boolean remove(String path) { + int pathSerSize = PathUtils.serializedSize(path); + + synchronized (this) { + int newSerSize = Math.subtractExact(serializedSize, pathSerSize); + boolean result = paths.remove(path); + if (result) { + serializedSize = newSerSize; + } + return result; + } } - public synchronized boolean remove(String path) { - return paths.remove(path); + public synchronized int getSerializedSize() { + return serializedSize; } }; From da718a353dc2c8598a0d46780af318b90200a733 Mon Sep 17 00:00:00 2001 From: Damien Diederen Date: Thu, 17 Jun 2021 16:33:09 +0200 Subject: [PATCH 07/11] ZOOKEEPER-4306: Test DataTree serialization size --- .../apache/zookeeper/server/DataTreeTest.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java index 13276d855c5..869521cdc77 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/DataTreeTest.java @@ -32,7 +32,9 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -49,6 +51,7 @@ import org.apache.zookeeper.common.PathTrie; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.metrics.MetricsUtils; +import org.apache.zookeeper.txn.CloseSessionTxn; import org.apache.zookeeper.txn.CreateTxn; import org.apache.zookeeper.txn.TxnHeader; import org.junit.jupiter.api.Test; @@ -671,4 +674,37 @@ private void testSerializeLastProcessedZxid(boolean enableForSerialize, boolean } } + private void testEphemeralsSerializedSizeWith(String[] paths) throws Exception { + DataTree dataTree = new DataTree(); + long ephemeralOwner = 42; + + for (String path : paths) { + int lastSlash = path.lastIndexOf('/'); + String parentPath = path.substring(0, lastSlash); + DataNode parent = dataTree.getNode(parentPath); + + dataTree.createNode(path, null, null, ephemeralOwner, parent.stat.getCversion() + 1, 1, 1); + } + + int size = dataTree.getEphemeralsSerializedSize(ephemeralOwner); + Set ephemerals = dataTree.getEphemerals(ephemeralOwner); + + CloseSessionTxn txn = new CloseSessionTxn(new ArrayList(ephemerals)); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); + txn.serialize(boa, "txn"); + baos.close(); + + assertEquals(size, baos.size()); + } + + @Test + public void testEphemeralsSerializedSize() throws Exception { + testEphemeralsSerializedSizeWith(new String[] { "/test" }); + + testEphemeralsSerializedSizeWith(new String[] { "/a", "/b", "/c" }); + + testEphemeralsSerializedSizeWith(new String[] { "/a", "/\u00f4", "/\u0939", "/\ud800\udf48" }); + } } From e1287651cc88fb4f23ad171d740d995bc0b2ec2e Mon Sep 17 00:00:00 2001 From: Damien Diederen Date: Thu, 17 Jun 2021 10:00:41 +0200 Subject: [PATCH 08/11] ZOOKEEPER-4306: PrepRequestProcessor: Prevent CloseSessionTxn "overflow" --- .../server/PrepRequestProcessor.java | 45 +++++++++++++++---- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java index 961948a9ca6..441d35c04a1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -30,6 +30,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.jute.BinaryInputArchive; import org.apache.jute.Record; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.DeleteContainerRequest; @@ -680,6 +681,15 @@ private void pRequest2TxnCreate(int type, Request request, Record record) throws } int newCversion = parentRecord.stat.getCversion() + 1; zks.checkQuota(path, null, data, OpCode.create); + long ephemeralOwner = 0; + if (createMode.isContainer()) { + ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER; + } else if (createMode.isTTL()) { + ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl); + } else if (createMode.isEphemeral()) { + checkCloseSessionTxnSize(path, request.sessionId); + ephemeralOwner = request.sessionId; + } if (type == OpCode.createContainer) { request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion)); } else if (type == OpCode.createTTL) { @@ -689,14 +699,6 @@ private void pRequest2TxnCreate(int type, Request request, Record record) throws } TxnHeader hdr = request.getHdr(); - long ephemeralOwner = 0; - if (createMode.isContainer()) { - ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER; - } else if (createMode.isTTL()) { - ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl); - } else if (createMode.isEphemeral()) { - ephemeralOwner = request.sessionId; - } StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner); parentRecord = parentRecord.duplicate(request.getHdr().getZxid()); parentRecord.childCount++; @@ -749,6 +751,33 @@ private static int checkAndIncVersion(int currentVersion, int expectedVersion, S } } + private void checkCloseSessionTxnSize(String path, long sessionId) throws KeeperException.MarshallingErrorException { + int size = PathUtils.serializedSize(path); + + List outstandingPaths = new ArrayList<>(); + DataTree dataTree = zks.getZKDatabase().getDataTree(); + synchronized (zks.outstandingChanges) { + size = Math.addExact(size, dataTree.getEphemeralsSerializedSize(sessionId)); + for (ChangeRecord c : zks.outstandingChanges) { + // Ignoring deleted nodes and existing ephemerals means that we might be + // overcounting. + if (c.stat != null && c.stat.getEphemeralOwner() == sessionId) { + outstandingPaths.add(c.path); + } + } + } + + for (String outstandingPath : outstandingPaths) { + size = Math.addExact(size, PathUtils.serializedSize(outstandingPath)); + } + + if (size > BinaryInputArchive.maxBuffer) { + LOG.info("Rejecting ephemeral path {} as it would overflow session 0x{}", + path, Long.toHexString(sessionId)); + throw new KeeperException.MarshallingErrorException(); + } + } + /** * This method will be called inside the ProcessRequestThread, which is a * singleton, so there will be a single thread calling this code. From 053f8061033e1a6169f41a7987ffda065b352faf Mon Sep 17 00:00:00 2001 From: Damien Diederen Date: Thu, 17 Jun 2021 14:52:25 +0200 Subject: [PATCH 09/11] ZOOKEEPER-4306: Introduce TooManyEphemeralsException --- .../org/apache/zookeeper/KeeperException.java | 21 ++++++++++++++++++- .../zookeeper/cli/CliWrapperException.java | 2 ++ .../server/PrepRequestProcessor.java | 4 ++-- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java index 0516892abfa..376af2c08be 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/KeeperException.java @@ -151,6 +151,8 @@ public static KeeperException create(Code code) { return new QuotaExceededException(); case THROTTLEDOP: return new ThrottledOpException(); + case TOOMANYEPHEMERALS: + return new TooManyEphemeralsException(); case OK: default: throw new IllegalArgumentException("Invalid exception code:" + code.code); @@ -415,7 +417,9 @@ public enum Code implements CodeDeprecated { /** Operation was throttled and not executed at all. This error code indicates that zookeeper server * is under heavy load and can't process incoming requests at full speed; please retry with back off. */ - THROTTLEDOP (-127); + THROTTLEDOP (-127), + /** Adding an ephemeral with the requested path could overflow transaction size. */ + TOOMANYEPHEMERALS(-128); private static final Map lookup = new HashMap<>(); @@ -514,6 +518,8 @@ static String getCodeMessage(Code code) { return "Quota has exceeded"; case THROTTLEDOP: return "Op throttled due to high load"; + case TOOMANYEPHEMERALS: + return "Adding an ephemeral with the requested path could overflow transaction size"; default: return "Unknown error " + code; } @@ -980,4 +986,17 @@ public ThrottledOpException() { super(Code.THROTTLEDOP); } } + + /** + * @see Code#TOOMANYEPHEMERALS + */ + @InterfaceAudience.Public + public static class TooManyEphemeralsException extends KeeperException { + public TooManyEphemeralsException() { + super(Code.TOOMANYEPHEMERALS); + } + public TooManyEphemeralsException(String path) { + super(Code.TOOMANYEPHEMERALS, path); + } + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/CliWrapperException.java b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/CliWrapperException.java index aa1bf44a54c..c4886b30975 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/CliWrapperException.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/CliWrapperException.java @@ -56,6 +56,8 @@ private static String getMessage(Throwable cause) { + "new servers are connected and synced"; } else if (keeperException instanceof KeeperException.QuotaExceededException) { return "Quota has exceeded : " + keeperException.getPath(); + } else if (keeperException instanceof KeeperException.TooManyEphemeralsException) { + return "Adding ephemeral could overflow transaction size : " + keeperException.getPath(); } } return cause.getMessage(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java index 441d35c04a1..06587ba243d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -751,7 +751,7 @@ private static int checkAndIncVersion(int currentVersion, int expectedVersion, S } } - private void checkCloseSessionTxnSize(String path, long sessionId) throws KeeperException.MarshallingErrorException { + private void checkCloseSessionTxnSize(String path, long sessionId) throws KeeperException.TooManyEphemeralsException { int size = PathUtils.serializedSize(path); List outstandingPaths = new ArrayList<>(); @@ -774,7 +774,7 @@ private void checkCloseSessionTxnSize(String path, long sessionId) throws Keeper if (size > BinaryInputArchive.maxBuffer) { LOG.info("Rejecting ephemeral path {} as it would overflow session 0x{}", path, Long.toHexString(sessionId)); - throw new KeeperException.MarshallingErrorException(); + throw new KeeperException.TooManyEphemeralsException(path); } } From 73df6d79c1f32decac720a4ccfcd5d9dae20675b Mon Sep 17 00:00:00 2001 From: Damien Diederen Date: Thu, 17 Jun 2021 17:58:39 +0200 Subject: [PATCH 10/11] ZOOKEEPER-4306: Test CloseSessionTxn "overflow" protection --- .../server/CloseSessionTxnSizeTest.java | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/CloseSessionTxnSizeTest.java diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/CloseSessionTxnSizeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/CloseSessionTxnSizeTest.java new file mode 100644 index 00000000000..04d8134bc24 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/CloseSessionTxnSizeTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Set; +import org.apache.jute.BinaryInputArchive; +import org.apache.jute.BinaryOutputArchive; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.txn.CloseSessionTxn; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class CloseSessionTxnSizeTest extends ClientBase { + + private static final String JUTE_MAXBUFFER = "jute.maxbuffer"; + + private static final int JUTE_MAXBUFFER_VALUE = 256; + + private String previousMaxBuffer; + + private ZooKeeper zk; + + @BeforeEach + @Override + public void setUp() throws Exception { + previousMaxBuffer = System.setProperty(JUTE_MAXBUFFER, Integer.toString(JUTE_MAXBUFFER_VALUE)); + assertEquals(JUTE_MAXBUFFER_VALUE, BinaryInputArchive.maxBuffer, "Couldn't set jute.maxbuffer!"); + super.setUp(); + zk = createClient(); + } + + @AfterEach + @Override + public void tearDown() throws Exception { + super.tearDown(); + zk.close(); + if (previousMaxBuffer == null) { + System.clearProperty(JUTE_MAXBUFFER); + } else { + System.setProperty(JUTE_MAXBUFFER, previousMaxBuffer); + } + } + + private static String makePath(int length) { + byte[] bytes = new byte[length]; + Arrays.fill(bytes, (byte) 'x'); + bytes[0] = (byte) '/'; + + return new String(bytes, StandardCharsets.US_ASCII); + } + + @Test + public void testCloseSessionTxnSizeFit() throws InterruptedException, IOException, KeeperException { + // 4 bytes for vector length, 4 bytes for string length + String path = makePath((JUTE_MAXBUFFER_VALUE - 4) / 2 - 4 - 1); + + zk.create(path + "x", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + zk.create(path + "y", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + + Set paths = serverFactory.getZooKeeperServer() + .getZKDatabase().getDataTree().getEphemerals(zk.getSessionId()); + + // Ensure we are looking at the right session. + assertEquals(2, paths.size(), "Ephemerals count"); + + CloseSessionTxn txn = new CloseSessionTxn(new ArrayList(paths)); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryOutputArchive oa = BinaryOutputArchive.getArchive(baos); + oa.writeRecord(txn, "CloseSessionTxn"); + + // Check that our encoding assumptions hold. + assertEquals(baos.size(), JUTE_MAXBUFFER_VALUE, "CloseSessionTxn size"); + } + + @Test + public void testCloseSessionTxnSizeOverflow() throws KeeperException, InterruptedException { + String path = makePath((JUTE_MAXBUFFER_VALUE - 4) / 2 - 4 - 1); + + zk.create(path + "x", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + + try { + zk.create(path + "yz", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + fail(); + } catch (KeeperException.TooManyEphemeralsException e) { + // Expected + } + } + + @Test + public void testCloseSessionTxnSizeSequential() throws KeeperException, InterruptedException { + String prefix = "/test-"; + String specimen = prefix + "0123456789"; + + int nOk = (JUTE_MAXBUFFER_VALUE - 4) / (specimen.length() + 4); + for (int i = 0; i < nOk; i++) { + zk.create(prefix, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + } + + try { + zk.create(prefix, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + fail(); + } catch (KeeperException.TooManyEphemeralsException e) { + // Expected + } + } +} From 1dbd423fc08ab5ed86463ff352be19d0a1fff6c2 Mon Sep 17 00:00:00 2001 From: Damien Diederen Date: Thu, 17 Jun 2021 15:05:11 +0200 Subject: [PATCH 11/11] ZOOKEEPER-4306: C client: Add ZTOOMANYEPHEMERALS code & message --- zookeeper-client/zookeeper-client-c/include/zookeeper.h | 3 ++- zookeeper-client/zookeeper-client-c/src/zookeeper.c | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/zookeeper-client/zookeeper-client-c/include/zookeeper.h b/zookeeper-client/zookeeper-client-c/include/zookeeper.h index 044250a3098..52b4b22f495 100644 --- a/zookeeper-client/zookeeper-client-c/include/zookeeper.h +++ b/zookeeper-client/zookeeper-client-c/include/zookeeper.h @@ -141,7 +141,8 @@ enum ZOO_ERRORS { ZNOWATCHER = -121, /*!< The watcher couldn't be found */ ZRECONFIGDISABLED = -123, /*!< Attempts to perform a reconfiguration operation when reconfiguration feature is disabled */ ZSESSIONCLOSEDREQUIRESASLAUTH = -124, /*!< The session has been closed by server because server requires client to do authentication via configured authentication scheme at server, but client is not configured with required authentication scheme or configured but failed (i.e. wrong credential used.). */ - ZTHROTTLEDOP = -127 /*!< Operation was throttled and not executed at all. please, retry! */ + ZTHROTTLEDOP = -127, /*!< Operation was throttled and not executed at all. please, retry! */ + ZTOOMANYEPHEMERALS = -128 /*!< Adding an ephemeral with the requested path could overflow transaction size */ /* when adding/changing values here also update zerror(int) to return correct error message */ }; diff --git a/zookeeper-client/zookeeper-client-c/src/zookeeper.c b/zookeeper-client/zookeeper-client-c/src/zookeeper.c index a19144b5028..075c6aa9c87 100644 --- a/zookeeper-client/zookeeper-client-c/src/zookeeper.c +++ b/zookeeper-client/zookeeper-client-c/src/zookeeper.c @@ -5051,6 +5051,8 @@ const char* zerror(int c) return "session closed by server because client is required to do SASL authentication"; case ZTHROTTLEDOP: return "Operation was throttled due to high load"; + case ZTOOMANYEPHEMERALS: + return "Adding an ephemeral with the requested path could overflow transaction size"; } if (c > 0) { return strerror(c);