Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement GRPC GeoBoundingBox, GeoDistance queries ([#19451](https://github.com/opensearch-project/OpenSearch/pull/19451))
- Implement GRPC Ids, Range, and Terms Set queries ([#19448](https://github.com/opensearch-project/OpenSearch/pull/19448))
- Implement GRPC Nested query ([#19453](https://github.com/opensearch-project/OpenSearch/pull/19453))
- Add sub aggregation support for histogram aggregation using skiplist ([19438](https://github.com/opensearch-project/OpenSearch/pull/19438))
- Optimization in String Terms Aggregation query for Large Bucket Counts([#18732](https://github.com/opensearch-project/OpenSearch/pull/18732))

### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
private final String fieldName;
private final boolean fieldIndexSort;

// Collector usage tracking fields
private int noOpCollectorsUsed;
private int singleValuedCollectorsUsed;
private int multiValuedCollectorsUsed;
private int skipListCollectorsUsed;

DateHistogramAggregator(
String name,
AggregatorFactories factories,
Expand Down Expand Up @@ -215,6 +221,7 @@ protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
noOpCollectorsUsed++;
return LeafBucketCollector.NO_OP_COLLECTOR;
}

Expand All @@ -225,17 +232,17 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
final SortedNumericDocValues values = valuesSource.longValues(ctx);
final NumericDocValues singleton = DocValues.unwrapSingleton(values);

// If no subaggregations and index sorted on given field, we can use skip list based collector
logger.trace("Index sort field found: {}, skipper: {}", fieldIndexSort, skipper);
if (fieldIndexSort && skipper != null && singleton != null) {
if (skipper != null && singleton != null) {
// TODO: add hard bounds support
if (hardBounds != null || sub == null || sub == LeafBucketCollector.NO_OP_COLLECTOR) {
return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, this::incrementBucketDocCount);
if (hardBounds == null && parent == null) {
skipListCollectorsUsed++;
return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, sub, this);
}
}

if (singleton != null) {
// Optimized path for single-valued fields
singleValuedCollectorsUsed++;
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
Expand All @@ -248,6 +255,7 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
}

// Original path for multi-valued fields
multiValuedCollectorsUsed++;
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
Expand Down Expand Up @@ -404,8 +412,13 @@ public void doClose() {

@Override
public void collectDebugInfo(BiConsumer<String, Object> add) {
super.collectDebugInfo(add);
add.accept("total_buckets", bucketOrds.size());
filterRewriteOptimizationContext.populateDebugInfo(add);
add.accept("no_op_collectors_used", noOpCollectorsUsed);
add.accept("single_valued_collectors_used", singleValuedCollectorsUsed);
add.accept("multi_valued_collectors_used", multiValuedCollectorsUsed);
add.accept("skip_list_collectors_used", skipListCollectorsUsed);
}

/**
Expand All @@ -426,7 +439,8 @@ private static class HistogramSkiplistLeafCollector extends LeafBucketCollector
private final DocValuesSkipper skipper;
private final Rounding.Prepared preparedRounding;
private final LongKeyedBucketOrds bucketOrds;
private final BiConsumer<Long, Long> incrementDocCount;
private final LeafBucketCollector sub;
private final BucketsAggregator aggregator;

/**
* Max doc ID (inclusive) up to which all docs values may map to the same bucket.
Expand All @@ -448,19 +462,25 @@ private static class HistogramSkiplistLeafCollector extends LeafBucketCollector
DocValuesSkipper skipper,
Rounding.Prepared preparedRounding,
LongKeyedBucketOrds bucketOrds,
BiConsumer<Long, Long> incrementDocCount
LeafBucketCollector sub,
BucketsAggregator aggregator
) {
this.values = values;
this.skipper = skipper;
this.preparedRounding = preparedRounding;
this.bucketOrds = bucketOrds;
this.incrementDocCount = incrementDocCount;
this.sub = sub;
this.aggregator = aggregator;
}

@Override
public void setScorer(Scorable scorer) throws IOException {}
public void setScorer(Scorable scorer) throws IOException {
if (sub != null) {
sub.setScorer(scorer);
}
}

private void advanceSkipper(int doc) throws IOException {
private void advanceSkipper(int doc, long owningBucketOrd) throws IOException {
if (doc > skipper.maxDocID(0)) {
skipper.advance(doc);
}
Expand All @@ -485,7 +505,7 @@ private void advanceSkipper(int doc) throws IOException {
// All docs at this level have a value, and all values map to the same bucket.
upToInclusive = skipper.maxDocID(level);
upToSameBucket = true;
upToBucketIndex = bucketOrds.add(0, maxBucket);
upToBucketIndex = bucketOrds.add(owningBucketOrd, maxBucket);
if (upToBucketIndex < 0) {
upToBucketIndex = -1 - upToBucketIndex;
}
Expand All @@ -497,48 +517,64 @@ private void advanceSkipper(int doc) throws IOException {

@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
collect(doc);
}

@Override
public void collect(int doc) throws IOException {
if (doc > upToInclusive) {
advanceSkipper(doc);
advanceSkipper(doc, owningBucketOrd);
}

if (upToSameBucket) {
incrementDocCount.accept(upToBucketIndex, 1L);
aggregator.incrementBucketDocCount(upToBucketIndex, 1L);
sub.collect(doc, upToBucketIndex);
} else if (values.advanceExact(doc)) {
final long value = values.longValue();
long bucketIndex = bucketOrds.add(0, preparedRounding.round(value));
long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value));
if (bucketIndex < 0) {
bucketIndex = -1 - bucketIndex;
aggregator.collectExistingBucket(sub, doc, bucketIndex);
} else {
aggregator.collectBucket(sub, doc, bucketIndex);
}
incrementDocCount.accept(bucketIndex, 1L);
}
}

@Override
public void collect(int doc) throws IOException {
collect(doc, 0);
}

@Override
public void collect(DocIdStream stream) throws IOException {
// This will only be called if its the top agg
for (;;) {
int upToExclusive = upToInclusive + 1;
if (upToExclusive < 0) { // overflow
upToExclusive = Integer.MAX_VALUE;
}

if (upToSameBucket) {
long count = stream.count(upToExclusive);
incrementDocCount.accept(upToBucketIndex, count);
if (sub == NO_OP_COLLECTOR) {
// stream.count maybe faster when we don't need to handle sub-aggs
long count = stream.count(upToExclusive);
aggregator.incrementBucketDocCount(upToBucketIndex, count);
} else {
final int[] count = { 0 };
stream.forEach(upToExclusive, doc -> {
sub.collect(doc, upToBucketIndex);
count[0]++;
});
aggregator.incrementBucketDocCount(upToBucketIndex, count[0]);
}

} else {
stream.forEach(upToExclusive, this::collect);
}

if (stream.mayHaveRemaining()) {
advanceSkipper(upToExclusive);
advanceSkipper(upToExclusive, 0);
} else {
break;
}
}
}

}
}
Loading
Loading