From b6c80256e349754cc5ff061cb5cccc2380a1b314 Mon Sep 17 00:00:00 2001 From: Branimir Lambov Date: Wed, 28 Jun 2023 15:23:46 +0300 Subject: [PATCH] Review comments --- .../compaction/UnifiedCompactionStrategy.java | 55 ++++++++++++++----- .../unified/ShardedMultiWriter.java | 8 +-- .../apache/cassandra/dht/SplitterTest.java | 13 ++++- 3 files changed, 53 insertions(+), 23 deletions(-) diff --git a/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java index f523199d3ae8..95e901b7b80b 100644 --- a/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java @@ -433,7 +433,7 @@ protected synchronized Set getSSTables() } /** - * @return a LinkedHashMap of arenas with buckets where order of arenas are preserved + * @return a list of the levels in the compaction hierarchy */ @VisibleForTesting List getLevels() @@ -442,14 +442,14 @@ List getLevels() } /** - * Groups the sstables passed in into arenas and buckets. This is used by the strategy to determine - * new compactions, and by external tools in CNDB to analyze the strategy decisions. + * Groups the sstables passed in into levels. This is used by the strategy to determine + * new compactions, and by external tools to analyze the strategy decisions. * - * @param sstables a collection of the sstables to be assigned to arenas + * @param sstables a collection of the sstables to be assigned to levels * @param compactionFilter a filter to exclude CompactionSSTables, * e.g., {@link #isSuitableForCompaction} * - * @return a map of arenas to their buckets + * @return a list of the levels in the compaction hierarchy */ public List getLevels(Collection sstables, Predicate compactionFilter) @@ -464,13 +464,13 @@ private List formLevels(List suitable) List levels = new ArrayList<>(MAX_LEVELS); suitable.sort(shardManager::compareByDensity); - double maxSize = controller.getMaxLevelDensity(0, controller.getBaseSstableSize(controller.getFanout(0)) / shardManager.localSpaceCoverage()); + double maxDensity = controller.getMaxLevelDensity(0, controller.getBaseSstableSize(controller.getFanout(0)) / shardManager.localSpaceCoverage()); int index = 0; - Level level = new Level(controller, index, 0, maxSize); + Level level = new Level(controller, index, 0, maxDensity); for (SSTableReader candidate : suitable) { - final double size = shardManager.density(candidate); - if (size < level.max) + final double density = shardManager.density(candidate); + if (density < level.max) { level.add(candidate); continue; @@ -482,10 +482,10 @@ private List formLevels(List suitable) while (true) { ++index; - double minSize = maxSize; - maxSize = controller.getMaxLevelDensity(index, minSize); - level = new Level(controller, index, minSize, maxSize); - if (size < level.max) + double minDensity = maxDensity; + maxDensity = controller.getMaxLevelDensity(index, minDensity); + level = new Level(controller, index, minDensity, maxDensity); + if (density < level.max) { level.add(candidate); break; @@ -549,7 +549,8 @@ public static class Level final int threshold; // number of SSTables that trigger a compaction final double min; // min density of sstables for this level final double max; // max density of sstables for this level - int maxOverlap = -1; // maximum number of overlapping sstables + int maxOverlap = -1; // maximum number of overlapping sstables, i.e. maximum number of sstables that need + // to be queried on this level for any given key Level(Controller controller, int index, double minSize, double maxSize) { @@ -585,7 +586,13 @@ void complete() } /** - * Return the compaction pick + * Return the compaction pick for this level. + *

+ * This is done by splitting the level into buckets that we can treat as independent regions for compaction. + * We then use the maxOverlap value (i.e. the maximum number of sstables that can contain data for any covered + * key) of each bucket to determine if compactions are needed, and to prioritize the buckets that contribute + * most to the complexity of queries: if maxOverlap is below the level's threshold, no compaction is needed; + * otherwise, we choose one from the buckets that have the highest maxOverlap. */ CompactionPick getCompactionPick(SelectionContext context) { @@ -629,6 +636,24 @@ CompactionPick getCompactionPick(SelectionContext context) return selected; } + /** + * Group the sstables in this level into buckets. + *

+ * The buckets are formed by grouping sstables that overlap at some key together, and then expanded to cover + * any overlapping sstable according to the overlap inclusion method. With the usual TRANSITIVE method this + * results into non-overlapping buckets that can't affect one another and can be compacted in parallel without + * any loss of efficiency. + *

+ * Other overlap inclusion methods are provided to cover situations where we may be okay with compacting + * sstables partially and doing more than the strictly necessary amount of compaction to solve a problem: e.g. + * after an upgrade from LCS where transitive overlap may cause a complete level to be compacted together + * (creating an operation that will take a very long time to complete) and we want to make some progress as + * quickly as possible at the cost of redoing some work. + *

+ * The number of sstables that overlap at some key defines the "overlap" of a set of sstables. The maximum such + * value in the bucket is its "maxOverlap", i.e. the highest number of sstables we need to read to find the + * data associated with a given key. + */ @VisibleForTesting List getBuckets(SelectionContext context) { diff --git a/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java b/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java index f2a39d7f017a..1fff50ad8c72 100644 --- a/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java @@ -45,13 +45,11 @@ /** * A {@link SSTableMultiWriter} that splits the output sstable at the partition boundaries of the compaction - * shards used by {@link org.apache.cassandra.db.compaction.UnifiedCompactionStrategy} as long as the size of - * the sstable so far is sufficiently large. + * shards used by {@link org.apache.cassandra.db.compaction.UnifiedCompactionStrategy}. *

- * This is class is similar to {@link ShardedMultiWriter} but for flushing. Unfortunately + * This is class is similar to {@link ShardedCompactionWriter} but for flushing. Unfortunately * we currently have 2 separate writers hierarchy that are not compatible and so we must - * duplicate the functionality of splitting sstables over compaction shards if they have - * reached a minimum size. + * duplicate the functionality. */ public class ShardedMultiWriter implements SSTableMultiWriter { diff --git a/test/unit/org/apache/cassandra/dht/SplitterTest.java b/test/unit/org/apache/cassandra/dht/SplitterTest.java index 230427e50e90..1de22ff8fc69 100644 --- a/test/unit/org/apache/cassandra/dht/SplitterTest.java +++ b/test/unit/org/apache/cassandra/dht/SplitterTest.java @@ -90,16 +90,23 @@ public void testWithWeight() List ranges = new ArrayList<>(); ranges.add(new Splitter.WeightedRange(1.0, t(0, 10))); ranges.add(new Splitter.WeightedRange(1.0, t(20, 30))); - ranges.add(new Splitter.WeightedRange(0.5, t(40, 60))); + ranges.add(new Splitter.WeightedRange(1.0, t(40, 50))); - List ranges2 = new ArrayList<>(); + List ranges2 = new ArrayList<>(); // same total coverage, split point inside weight-1 range ranges2.add(new Splitter.WeightedRange(1.0, t(0, 10))); ranges2.add(new Splitter.WeightedRange(1.0, t(20, 30))); - ranges2.add(new Splitter.WeightedRange(1.0, t(40, 50))); + ranges2.add(new Splitter.WeightedRange(0.5, t(40, 60))); + + List ranges3 = new ArrayList<>(); // same total coverage, split point inside weight-0.5 range + ranges3.add(new Splitter.WeightedRange(1.0, t(0, 10))); + ranges3.add(new Splitter.WeightedRange(0.5, t(15, 35))); + ranges3.add(new Splitter.WeightedRange(1.0, t(40, 50))); + IPartitioner partitioner = Murmur3Partitioner.instance; Splitter splitter = partitioner.splitter().get(); assertEquals(splitter.splitOwnedRanges(2, ranges, false), splitter.splitOwnedRanges(2, ranges2, false)); + assertEquals(splitter.splitOwnedRanges(2, ranges, false), splitter.splitOwnedRanges(2, ranges3, false)); } @Test