Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
blambov committed Jun 28, 2023
1 parent 31b290b commit b6c8025
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ protected synchronized Set<SSTableReader> getSSTables()
}

/**
* @return a LinkedHashMap of arenas with buckets where order of arenas are preserved
* @return a list of the levels in the compaction hierarchy
*/
@VisibleForTesting
List<Level> getLevels()
Expand All @@ -442,14 +442,14 @@ List<Level> getLevels()
}

/**
* Groups the sstables passed in into arenas and buckets. This is used by the strategy to determine
* new compactions, and by external tools in CNDB to analyze the strategy decisions.
* Groups the sstables passed in into levels. This is used by the strategy to determine
* new compactions, and by external tools to analyze the strategy decisions.
*
* @param sstables a collection of the sstables to be assigned to arenas
* @param sstables a collection of the sstables to be assigned to levels
* @param compactionFilter a filter to exclude CompactionSSTables,
* e.g., {@link #isSuitableForCompaction}
*
* @return a map of arenas to their buckets
* @return a list of the levels in the compaction hierarchy
*/
public List<Level> getLevels(Collection<SSTableReader> sstables,
Predicate<SSTableReader> compactionFilter)
Expand All @@ -464,13 +464,13 @@ private List<Level> formLevels(List<SSTableReader> suitable)
List<Level> levels = new ArrayList<>(MAX_LEVELS);
suitable.sort(shardManager::compareByDensity);

double maxSize = controller.getMaxLevelDensity(0, controller.getBaseSstableSize(controller.getFanout(0)) / shardManager.localSpaceCoverage());
double maxDensity = controller.getMaxLevelDensity(0, controller.getBaseSstableSize(controller.getFanout(0)) / shardManager.localSpaceCoverage());
int index = 0;
Level level = new Level(controller, index, 0, maxSize);
Level level = new Level(controller, index, 0, maxDensity);
for (SSTableReader candidate : suitable)
{
final double size = shardManager.density(candidate);
if (size < level.max)
final double density = shardManager.density(candidate);
if (density < level.max)
{
level.add(candidate);
continue;
Expand All @@ -482,10 +482,10 @@ private List<Level> formLevels(List<SSTableReader> suitable)
while (true)
{
++index;
double minSize = maxSize;
maxSize = controller.getMaxLevelDensity(index, minSize);
level = new Level(controller, index, minSize, maxSize);
if (size < level.max)
double minDensity = maxDensity;
maxDensity = controller.getMaxLevelDensity(index, minDensity);
level = new Level(controller, index, minDensity, maxDensity);
if (density < level.max)
{
level.add(candidate);
break;
Expand Down Expand Up @@ -549,7 +549,8 @@ public static class Level
final int threshold; // number of SSTables that trigger a compaction
final double min; // min density of sstables for this level
final double max; // max density of sstables for this level
int maxOverlap = -1; // maximum number of overlapping sstables
int maxOverlap = -1; // maximum number of overlapping sstables, i.e. maximum number of sstables that need
// to be queried on this level for any given key

Level(Controller controller, int index, double minSize, double maxSize)
{
Expand Down Expand Up @@ -585,7 +586,13 @@ void complete()
}

/**
* Return the compaction pick
* Return the compaction pick for this level.
* <p>
* This is done by splitting the level into buckets that we can treat as independent regions for compaction.
* We then use the maxOverlap value (i.e. the maximum number of sstables that can contain data for any covered
* key) of each bucket to determine if compactions are needed, and to prioritize the buckets that contribute
* most to the complexity of queries: if maxOverlap is below the level's threshold, no compaction is needed;
* otherwise, we choose one from the buckets that have the highest maxOverlap.
*/
CompactionPick getCompactionPick(SelectionContext context)
{
Expand Down Expand Up @@ -629,6 +636,24 @@ CompactionPick getCompactionPick(SelectionContext context)
return selected;
}

/**
* Group the sstables in this level into buckets.
* <p>
* The buckets are formed by grouping sstables that overlap at some key together, and then expanded to cover
* any overlapping sstable according to the overlap inclusion method. With the usual TRANSITIVE method this
* results into non-overlapping buckets that can't affect one another and can be compacted in parallel without
* any loss of efficiency.
* <p>
* Other overlap inclusion methods are provided to cover situations where we may be okay with compacting
* sstables partially and doing more than the strictly necessary amount of compaction to solve a problem: e.g.
* after an upgrade from LCS where transitive overlap may cause a complete level to be compacted together
* (creating an operation that will take a very long time to complete) and we want to make some progress as
* quickly as possible at the cost of redoing some work.
* <p>
* The number of sstables that overlap at some key defines the "overlap" of a set of sstables. The maximum such
* value in the bucket is its "maxOverlap", i.e. the highest number of sstables we need to read to find the
* data associated with a given key.
*/
@VisibleForTesting
List<Bucket> getBuckets(SelectionContext context)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,11 @@

/**
* A {@link SSTableMultiWriter} that splits the output sstable at the partition boundaries of the compaction
* shards used by {@link org.apache.cassandra.db.compaction.UnifiedCompactionStrategy} as long as the size of
* the sstable so far is sufficiently large.
* shards used by {@link org.apache.cassandra.db.compaction.UnifiedCompactionStrategy}.
* <p/>
* This is class is similar to {@link ShardedMultiWriter} but for flushing. Unfortunately
* This is class is similar to {@link ShardedCompactionWriter} but for flushing. Unfortunately
* we currently have 2 separate writers hierarchy that are not compatible and so we must
* duplicate the functionality of splitting sstables over compaction shards if they have
* reached a minimum size.
* duplicate the functionality.
*/
public class ShardedMultiWriter implements SSTableMultiWriter
{
Expand Down
13 changes: 10 additions & 3 deletions test/unit/org/apache/cassandra/dht/SplitterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,23 @@ public void testWithWeight()
List<Splitter.WeightedRange> ranges = new ArrayList<>();
ranges.add(new Splitter.WeightedRange(1.0, t(0, 10)));
ranges.add(new Splitter.WeightedRange(1.0, t(20, 30)));
ranges.add(new Splitter.WeightedRange(0.5, t(40, 60)));
ranges.add(new Splitter.WeightedRange(1.0, t(40, 50)));

List<Splitter.WeightedRange> ranges2 = new ArrayList<>();
List<Splitter.WeightedRange> ranges2 = new ArrayList<>(); // same total coverage, split point inside weight-1 range
ranges2.add(new Splitter.WeightedRange(1.0, t(0, 10)));
ranges2.add(new Splitter.WeightedRange(1.0, t(20, 30)));
ranges2.add(new Splitter.WeightedRange(1.0, t(40, 50)));
ranges2.add(new Splitter.WeightedRange(0.5, t(40, 60)));

List<Splitter.WeightedRange> ranges3 = new ArrayList<>(); // same total coverage, split point inside weight-0.5 range
ranges3.add(new Splitter.WeightedRange(1.0, t(0, 10)));
ranges3.add(new Splitter.WeightedRange(0.5, t(15, 35)));
ranges3.add(new Splitter.WeightedRange(1.0, t(40, 50)));

IPartitioner partitioner = Murmur3Partitioner.instance;
Splitter splitter = partitioner.splitter().get();

assertEquals(splitter.splitOwnedRanges(2, ranges, false), splitter.splitOwnedRanges(2, ranges2, false));
assertEquals(splitter.splitOwnedRanges(2, ranges, false), splitter.splitOwnedRanges(2, ranges3, false));
}

@Test
Expand Down

0 comments on commit b6c8025

Please sign in to comment.