diff --git a/CHANGELOG.md b/CHANGELOG.md index a2d606cdec7c0..38dfb52d0e3a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support pull-based ingestion message mappers and raw payload support ([#19765](https://github.com/opensearch-project/OpenSearch/pull/19765)] ### Changed +- Combining filter rewrite and skip list to optimize sub aggregation([#19573](https://github.com/opensearch-project/OpenSearch/pull/19573)) - Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350)) - Refactor to move prepareIndex and prepareDelete methods to Engine class ([#19551](https://github.com/opensearch-project/OpenSearch/pull/19551)) - Omit maxScoreCollector in SimpleTopDocsCollectorContext when concurrent segment search enabled ([#19584](https://github.com/opensearch-project/OpenSearch/pull/19584)) diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java index 42ba00e9182bf..50df6a2ea736d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java @@ -73,6 +73,7 @@ public abstract class AggregatorBase extends Aggregator { private Map subAggregatorbyName; private final CircuitBreakerService breakerService; private long requestBytesUsed; + boolean precomputePath = false; /** * Constructs a new Aggregator. @@ -202,8 +203,13 @@ public Map metadata() { @Override public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { - if (tryPrecomputeAggregationForLeaf(ctx)) { - throw new CollectionTerminatedException(); + try { + precomputePath = true; + if (tryPrecomputeAggregationForLeaf(ctx)) { + throw new CollectionTerminatedException(); + } + } finally { + precomputePath = false; } preGetSubLeafCollectors(ctx); final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx); @@ -236,6 +242,29 @@ protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws return false; } + /** + * Returns true if currently in precompute path + * @return + */ + protected boolean isTryPrecomputePath() { + if (precomputePath) { + return true; + } else if (parent == null) { + return false; + } + Aggregator current = parent; + do { + if (current instanceof AggregatorBase base) { + if (base.precomputePath) { + return true; + } + } + current = current.parent(); + } while (current != null); + + return precomputePath; + } + @Override public final void preCollection() throws IOException { List collectors = Arrays.asList(subAggregators); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java new file mode 100644 index 0000000000000..6ca92114eef4e --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/HistogramSkiplistLeafCollector.java @@ -0,0 +1,231 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.bucket; + +import org.apache.lucene.index.DocValuesSkipper; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.search.DocIdStream; +import org.apache.lucene.search.Scorable; +import org.opensearch.common.Rounding; +import org.opensearch.search.aggregations.LeafBucketCollector; +import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; + +import java.io.IOException; +import java.util.function.LongFunction; +import java.util.function.Supplier; + +/** + * Histogram collection logic using skip list. + * + * Currently, it can only handle one owningBucketOrd at a time. + * + * @opensearch.internal + */ +public class HistogramSkiplistLeafCollector extends LeafBucketCollector { + + private final NumericDocValues values; + private final DocValuesSkipper skipper; + private final LeafBucketCollector sub; + private final BucketsAggregator aggregator; + + /** + * Supplier function to get the current preparedRounding from the parent aggregator. + * This allows detection of rounding changes in AutoDateHistogramAggregator. + */ + private final LongFunction preparedRoundingSupplier; + private final java.util.function.Supplier bucketOrdsSupplier; + private final IncreaseRoundingIfNeeded increaseRoundingIfNeeded; + + /** + * Max doc ID (inclusive) up to which all docs values may map to the same + * bucket. + */ + private int upToInclusive = -1; + + /** + * Whether all docs up to {@link #upToInclusive} values map to the same bucket. + */ + private boolean upToSameBucket; + + /** + * Index in bucketOrds for docs up to {@link #upToInclusive}. + */ + private long upToBucketIndex; + + /** + * Tracks the last preparedRounding reference to detect rounding changes. + * Used for cache invalidation when AutoDateHistogramAggregator changes rounding. + */ + private Rounding.Prepared lastPreparedRounding; + + public HistogramSkiplistLeafCollector( + NumericDocValues values, + DocValuesSkipper skipper, + Rounding.Prepared preparedRounding, + LongKeyedBucketOrds bucketOrds, + LeafBucketCollector sub, + BucketsAggregator aggregator + ) { + this.values = values; + this.skipper = skipper; + this.preparedRoundingSupplier = (owningBucketOrd) -> preparedRounding; + this.bucketOrdsSupplier = () -> bucketOrds; + this.sub = sub; + this.aggregator = aggregator; + this.increaseRoundingIfNeeded = (owningBucketOrd, rounded) -> {}; + } + + /** + * Constructor that accepts a supplier for dynamic rounding (used by AutoDateHistogramAggregator). + */ + public HistogramSkiplistLeafCollector( + NumericDocValues values, + DocValuesSkipper skipper, + LongFunction preparedRoundingSupplier, + Supplier bucketOrdsSupplier, + LeafBucketCollector sub, + BucketsAggregator aggregator, + IncreaseRoundingIfNeeded increaseRoundingIfNeeded + ) { + this.values = values; + this.skipper = skipper; + this.preparedRoundingSupplier = preparedRoundingSupplier; + this.bucketOrdsSupplier = bucketOrdsSupplier; + this.sub = sub; + this.aggregator = aggregator; + this.increaseRoundingIfNeeded = increaseRoundingIfNeeded; + } + + @Override + public void setScorer(Scorable scorer) throws IOException { + if (sub != null) { + sub.setScorer(scorer); + } + } + + private void advanceSkipper(int doc, long owningBucketOrd) throws IOException { + if (doc > skipper.maxDocID(0)) { + skipper.advance(doc); + } + upToSameBucket = false; + + if (skipper.minDocID(0) > doc) { + // Corner case which happens if `doc` doesn't have a value and is between two + // intervals of + // the doc-value skip index. + upToInclusive = skipper.minDocID(0) - 1; + return; + } + + upToInclusive = skipper.maxDocID(0); + + // Get current rounding from supplier + Rounding.Prepared currentRounding = preparedRoundingSupplier.apply(owningBucketOrd); + + // Now find the highest level where all docs map to the same bucket. + for (int level = 0; level < skipper.numLevels(); ++level) { + int totalDocsAtLevel = skipper.maxDocID(level) - skipper.minDocID(level) + 1; + long minBucket = currentRounding.round(skipper.minValue(level)); + long maxBucket = currentRounding.round(skipper.maxValue(level)); + + if (skipper.docCount(level) == totalDocsAtLevel && minBucket == maxBucket) { + // All docs at this level have a value, and all values map to the same bucket. + upToInclusive = skipper.maxDocID(level); + upToSameBucket = true; + upToBucketIndex = bucketOrdsSupplier.get().add(owningBucketOrd, maxBucket); + if (upToBucketIndex < 0) { + upToBucketIndex = -1 - upToBucketIndex; + } + } else { + break; + } + } + } + + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + Rounding.Prepared currentRounding = preparedRoundingSupplier.apply(owningBucketOrd); + + // Check if rounding changed (using reference equality) + // AutoDateHistogramAggregator creates a new Rounding.Prepared instance when rounding changes + if (currentRounding != lastPreparedRounding) { + upToInclusive = -1; // Invalidate cache + upToSameBucket = false; + lastPreparedRounding = currentRounding; + } + + if (doc > upToInclusive) { + advanceSkipper(doc, owningBucketOrd); + } + + if (upToSameBucket) { + aggregator.incrementBucketDocCount(upToBucketIndex, 1L); + sub.collect(doc, upToBucketIndex); + } else if (values.advanceExact(doc)) { + final long value = values.longValue(); + long rounded = currentRounding.round(value); + long bucketIndex = bucketOrdsSupplier.get().add(owningBucketOrd, rounded); + if (bucketIndex < 0) { + bucketIndex = -1 - bucketIndex; + aggregator.collectExistingBucket(sub, doc, bucketIndex); + } else { + aggregator.collectBucket(sub, doc, bucketIndex); + increaseRoundingIfNeeded.accept(owningBucketOrd, rounded); + } + } + } + + @Override + public void collect(DocIdStream stream) throws IOException { + // This will only be called if its the top agg + collect(stream, 0); + } + + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + for (;;) { + int upToExclusive = upToInclusive + 1; + if (upToExclusive < 0) { // overflow + upToExclusive = Integer.MAX_VALUE; + } + + if (upToSameBucket) { + if (sub == NO_OP_COLLECTOR) { + // stream.count maybe faster when we don't need to handle sub-aggs + long count = stream.count(upToExclusive); + aggregator.incrementBucketDocCount(upToBucketIndex, count); + } else { + final int[] count = { 0 }; + stream.forEach(upToExclusive, doc -> { + sub.collect(doc, upToBucketIndex); + count[0]++; + }); + aggregator.incrementBucketDocCount(upToBucketIndex, count[0]); + } + } else { + stream.forEach(upToExclusive, doc -> collect(doc, owningBucketOrd)); + } + + if (stream.mayHaveRemaining()) { + advanceSkipper(upToExclusive, owningBucketOrd); + } else { + break; + } + } + } + + /** + * Call back for auto date histogram + * + * @opensearch.internal + */ + public interface IncreaseRoundingIfNeeded { + void accept(long owningBucket, long rounded); + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 63951953a2f5d..60aac6dee38aa 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -31,7 +31,10 @@ package org.opensearch.search.aggregations.bucket.histogram; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.DocValuesSkipper; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.DocIdStream; import org.apache.lucene.search.ScoreMode; @@ -52,6 +55,7 @@ import org.opensearch.search.aggregations.LeafBucketCollectorBase; import org.opensearch.search.aggregations.bucket.DeferableBucketAggregator; import org.opensearch.search.aggregations.bucket.DeferringBucketCollector; +import org.opensearch.search.aggregations.bucket.HistogramSkiplistLeafCollector; import org.opensearch.search.aggregations.bucket.MergingBucketsDeferringCollector; import org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge; import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext; @@ -85,6 +89,7 @@ * @opensearch.internal */ abstract class AutoDateHistogramAggregator extends DeferableBucketAggregator { + static AutoDateHistogramAggregator build( String name, AggregatorFactories factories, @@ -135,6 +140,7 @@ static AutoDateHistogramAggregator build( protected final int targetBuckets; protected int roundingIdx; protected Rounding.Prepared preparedRounding; + private final String fieldName; private final FilterRewriteOptimizationContext filterRewriteOptimizationContext; @@ -157,6 +163,9 @@ private AutoDateHistogramAggregator( this.roundingInfos = roundingInfos; this.roundingPreparer = roundingPreparer; this.preparedRounding = prepareRounding(0); + this.fieldName = (valuesSource instanceof ValuesSource.Numeric.FieldData) + ? ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName() + : null; DateHistogramAggregatorBridge bridge = new DateHistogramAggregatorBridge() { @Override @@ -243,7 +252,11 @@ public final DeferringBucketCollector getDeferringCollector() { return deferringCollector; } - protected abstract LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException; + protected abstract LeafBucketCollector getLeafCollector( + SortedNumericDocValues values, + DocValuesSkipper skipper, + LeafBucketCollector sub + ) throws IOException; @Override protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException { @@ -262,7 +275,12 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc } final SortedNumericDocValues values = valuesSource.longValues(ctx); - final LeafBucketCollector iteratingCollector = getLeafCollector(values, sub); + DocValuesSkipper skipper = null; + if (this.fieldName != null) { + skipper = ctx.reader().getDocValuesSkipper(this.fieldName); + } + + final LeafBucketCollector iteratingCollector = getLeafCollector(values, skipper, sub); return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { @@ -271,12 +289,12 @@ public void collect(int doc, long owningBucketOrd) throws IOException { @Override public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { - super.collect(stream, owningBucketOrd); + iteratingCollector.collect(stream, owningBucketOrd); } @Override public void collectRange(int min, int max) throws IOException { - super.collectRange(min, max); + iteratingCollector.collectRange(min, max); } }; } @@ -370,6 +388,9 @@ private static class FromSingle extends AutoDateHistogramAggregator { private long min = Long.MAX_VALUE; private long max = Long.MIN_VALUE; + // Debug tracking counters for collector types + private int skiplistCollectorCount = 0; + FromSingle( String name, AggregatorFactories factories, @@ -402,7 +423,29 @@ protected LongKeyedBucketOrds getBucketOrds() { } @Override - protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException { + protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, DocValuesSkipper skipper, LeafBucketCollector sub) + throws IOException { + // Check if skiplist optimization is available + final NumericDocValues singleton = DocValues.unwrapSingleton(values); + if (singleton != null && skipper != null) { + // FIXME: replace isTryPrecomputePath with collector mode + if (parent == null || isTryPrecomputePath()) { + // Increment skiplist collector count + skiplistCollectorCount++; + return new HistogramSkiplistLeafCollector( + singleton, + skipper, + (owningBucketOrd) -> preparedRounding, // for FromSingle there will be no parent/ + () -> bucketOrds, + sub, + FromSingle.this, + (owningBucket, rounded) -> increaseRoundingIfNeeded(rounded) // Pass supplier to allow rounding change + // detectionincreaseRoundingIfNeeded + ); + } + } + + // Fall back to standard LeafBucketCollectorBase when skiplist unavailable return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { @@ -446,62 +489,63 @@ private void collectValue(int doc, long rounded) throws IOException { increaseRoundingIfNeeded(rounded); } - /** - * Examine our current bucket count and the most recently added bucket to determine if an update to - * preparedRounding is required to keep total bucket count in compliance with targetBuckets. - * - * @param rounded the most recently collected value rounded - */ - private void increaseRoundingIfNeeded(long rounded) { - // If we are already using the rounding with the largest interval nothing can be done - if (roundingIdx >= roundingInfos.length - 1) { - return; - } + }; + } - // Re calculate the max and min values we expect to bucket according to most recently rounded val - min = Math.min(min, rounded); - max = Math.max(max, rounded); - - /** - * Quick explanation of the two below conditions: - * - * 1. [targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval()] - * Represents the total bucket count possible before we will exceed targetBuckets - * even if we use the maximum inner interval of our current rounding. For example, consider the - * DAYS_OF_MONTH rounding where the maximum inner interval is 7 days (i.e. 1 week buckets). - * targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() would then be the number of - * 1 day buckets possible such that if we re-bucket to 1 week buckets we will have more 1 week buckets - * than our targetBuckets limit. If the current count of buckets exceeds this limit we must update - * our rounding. - * - * 2. [targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()] - * The total duration of ms covered by our current rounding. In the case of MINUTES_OF_HOUR rounding - * getMaximumRoughEstimateDurationMillis is 60000. If our current total range in millis (max - min) - * exceeds this range we must update our rounding. - */ - if (bucketOrds.size() <= targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() - && max - min <= targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()) { - return; + /** + * Examine our current bucket count and the most recently added bucket to determine if an update to + * preparedRounding is required to keep total bucket count in compliance with targetBuckets. + * + * @param rounded the most recently collected value rounded + */ + private void increaseRoundingIfNeeded(long rounded) { + // If we are already using the rounding with the largest interval nothing can be done + if (roundingIdx >= roundingInfos.length - 1) { + return; + } + + // Re calculate the max and min values we expect to bucket according to most recently rounded val + min = Math.min(min, rounded); + max = Math.max(max, rounded); + + /** + * Quick explanation of the two below conditions: + * + * 1. [targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval()] + * Represents the total bucket count possible before we will exceed targetBuckets + * even if we use the maximum inner interval of our current rounding. For example, consider the + * DAYS_OF_MONTH rounding where the maximum inner interval is 7 days (i.e. 1 week buckets). + * targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() would then be the number of + * 1 day buckets possible such that if we re-bucket to 1 week buckets we will have more 1 week buckets + * than our targetBuckets limit. If the current count of buckets exceeds this limit we must update + * our rounding. + * + * 2. [targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()] + * The total duration of ms covered by our current rounding. In the case of MINUTES_OF_HOUR rounding + * getMaximumRoughEstimateDurationMillis is 60000. If our current total range in millis (max - min) + * exceeds this range we must update our rounding. + */ + if (bucketOrds.size() <= targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() + && max - min <= targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()) { + return; + } + do { + try (LongKeyedBucketOrds oldOrds = bucketOrds) { + preparedRounding = prepareRounding(++roundingIdx); + long[] mergeMap = new long[Math.toIntExact(oldOrds.size())]; + bucketOrds = new LongKeyedBucketOrds.FromSingle(context.bigArrays()); + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = oldOrds.ordsEnum(0); + while (ordsEnum.next()) { + long oldKey = ordsEnum.value(); + long newKey = preparedRounding.round(oldKey); + long newBucketOrd = bucketOrds.add(0, newKey); + mergeMap[(int) ordsEnum.ord()] = newBucketOrd >= 0 ? newBucketOrd : -1 - newBucketOrd; } - do { - try (LongKeyedBucketOrds oldOrds = bucketOrds) { - preparedRounding = prepareRounding(++roundingIdx); - long[] mergeMap = new long[Math.toIntExact(oldOrds.size())]; - bucketOrds = new LongKeyedBucketOrds.FromSingle(context.bigArrays()); - LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = oldOrds.ordsEnum(0); - while (ordsEnum.next()) { - long oldKey = ordsEnum.value(); - long newKey = preparedRounding.round(oldKey); - long newBucketOrd = bucketOrds.add(0, newKey); - mergeMap[(int) ordsEnum.ord()] = newBucketOrd >= 0 ? newBucketOrd : -1 - newBucketOrd; - } - merge(mergeMap, bucketOrds.size()); - } - } while (roundingIdx < roundingInfos.length - 1 - && (bucketOrds.size() > targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() - || max - min > targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis())); + merge(mergeMap, bucketOrds.size()); } - }; + } while (roundingIdx < roundingInfos.length - 1 + && (bucketOrds.size() > targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() + || max - min > targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis())); } @Override @@ -513,6 +557,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I public void collectDebugInfo(BiConsumer add) { super.collectDebugInfo(add); add.accept("surviving_buckets", bucketOrds.size()); + add.accept("skiplist_collectors_used", skiplistCollectorCount); } @Override @@ -619,6 +664,8 @@ private static class FromMany extends AutoDateHistogramAggregator { */ private int rebucketCount = 0; + private int skiplistCollectorCount = 0; + FromMany( String name, AggregatorFactories factories, @@ -661,7 +708,42 @@ protected LongKeyedBucketOrds getBucketOrds() { } @Override - protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException { + protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, DocValuesSkipper skipper, LeafBucketCollector sub) + throws IOException { + + final NumericDocValues singleton = DocValues.unwrapSingleton(values); + if (singleton != null && skipper != null) { + // FIXME: replace isTryPrecomputePath with collector mode + /** + * HistogramSkiplistLeafCollector in its current state can only handle one owningBucketOrd at a time. + * When parent is null, i.e. then ForSingle class will get used. ForMany is used when auto date is sub agg. + * In the special case where in FilterRewrite (SubAggRangeCollector) logic, we can use Skiplist because we + * will one range (and thus owningBucketOrd) at time. + * + * In the future we can enhance HistogramSkiplistLeafCollector to handle multiple owningBucketOrd, + * similar to FromMany. + */ + if (isTryPrecomputePath()) { + // Increment skiplist collector count + skiplistCollectorCount++; + + return new HistogramSkiplistLeafCollector( + singleton, + skipper, + (owningBucketOrd) -> preparedRoundings[roundingIndexFor(owningBucketOrd)], + () -> bucketOrds, + sub, + FromMany.this, + (owningBucketOrd, rounded) -> { + int roundingIdx = roundingIndexFor(owningBucketOrd); + liveBucketCountUnderestimate = context.bigArrays().grow(liveBucketCountUnderestimate, owningBucketOrd + 1); + int estimatedBucketCount = liveBucketCountUnderestimate.increment(owningBucketOrd, 1); + increaseRoundingIfNeeded(owningBucketOrd, estimatedBucketCount, rounded, roundingIdx); + } + ); + } + } + return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { @@ -707,61 +789,62 @@ private int collectValue(long owningBucketOrd, int roundingIdx, int doc, long ro return increaseRoundingIfNeeded(owningBucketOrd, estimatedBucketCount, rounded, roundingIdx); } - /** - * Increase the rounding of {@code owningBucketOrd} using - * estimated, bucket counts, {@link FromMany#rebucket()} rebucketing} the all - * buckets if the estimated number of wasted buckets is too high. - */ - private int increaseRoundingIfNeeded(long owningBucketOrd, int oldEstimatedBucketCount, long newKey, int oldRounding) { - if (oldRounding >= roundingInfos.length - 1) { - return oldRounding; - } - if (mins.size() < owningBucketOrd + 1) { - long oldSize = mins.size(); - mins = context.bigArrays().grow(mins, owningBucketOrd + 1); - mins.fill(oldSize, mins.size(), Long.MAX_VALUE); - } - if (maxes.size() < owningBucketOrd + 1) { - long oldSize = maxes.size(); - maxes = context.bigArrays().grow(maxes, owningBucketOrd + 1); - maxes.fill(oldSize, maxes.size(), Long.MIN_VALUE); - } - - long min = Math.min(mins.get(owningBucketOrd), newKey); - mins.set(owningBucketOrd, min); - long max = Math.max(maxes.get(owningBucketOrd), newKey); - maxes.set(owningBucketOrd, max); - if (oldEstimatedBucketCount <= targetBuckets * roundingInfos[oldRounding].getMaximumInnerInterval() - && max - min <= targetBuckets * roundingInfos[oldRounding].getMaximumRoughEstimateDurationMillis()) { - return oldRounding; - } - long oldRoughDuration = roundingInfos[oldRounding].roughEstimateDurationMillis; - int newRounding = oldRounding; - int newEstimatedBucketCount; - do { - newRounding++; - double ratio = (double) oldRoughDuration / (double) roundingInfos[newRounding].getRoughEstimateDurationMillis(); - newEstimatedBucketCount = (int) Math.ceil(oldEstimatedBucketCount * ratio); - } while (newRounding < roundingInfos.length - 1 - && (newEstimatedBucketCount > targetBuckets * roundingInfos[newRounding].getMaximumInnerInterval() - || max - min > targetBuckets * roundingInfos[newRounding].getMaximumRoughEstimateDurationMillis())); - setRounding(owningBucketOrd, newRounding); - mins.set(owningBucketOrd, preparedRoundings[newRounding].round(mins.get(owningBucketOrd))); - maxes.set(owningBucketOrd, preparedRoundings[newRounding].round(maxes.get(owningBucketOrd))); - wastedBucketsOverestimate += oldEstimatedBucketCount - newEstimatedBucketCount; - if (wastedBucketsOverestimate > nextRebucketAt) { - rebucket(); - // Bump the threshold for the next rebucketing - wastedBucketsOverestimate = 0; - nextRebucketAt *= 2; - } else { - liveBucketCountUnderestimate.set(owningBucketOrd, newEstimatedBucketCount); - } - return newRounding; - } }; } + /** + * Increase the rounding of {@code owningBucketOrd} using + * estimated, bucket counts, {@link FromMany#rebucket()} rebucketing} the all + * buckets if the estimated number of wasted buckets is too high. + */ + private int increaseRoundingIfNeeded(long owningBucketOrd, int oldEstimatedBucketCount, long newKey, int oldRounding) { + if (oldRounding >= roundingInfos.length - 1) { + return oldRounding; + } + if (mins.size() < owningBucketOrd + 1) { + long oldSize = mins.size(); + mins = context.bigArrays().grow(mins, owningBucketOrd + 1); + mins.fill(oldSize, mins.size(), Long.MAX_VALUE); + } + if (maxes.size() < owningBucketOrd + 1) { + long oldSize = maxes.size(); + maxes = context.bigArrays().grow(maxes, owningBucketOrd + 1); + maxes.fill(oldSize, maxes.size(), Long.MIN_VALUE); + } + + long min = Math.min(mins.get(owningBucketOrd), newKey); + mins.set(owningBucketOrd, min); + long max = Math.max(maxes.get(owningBucketOrd), newKey); + maxes.set(owningBucketOrd, max); + if (oldEstimatedBucketCount <= targetBuckets * roundingInfos[oldRounding].getMaximumInnerInterval() + && max - min <= targetBuckets * roundingInfos[oldRounding].getMaximumRoughEstimateDurationMillis()) { + return oldRounding; + } + long oldRoughDuration = roundingInfos[oldRounding].roughEstimateDurationMillis; + int newRounding = oldRounding; + int newEstimatedBucketCount; + do { + newRounding++; + double ratio = (double) oldRoughDuration / (double) roundingInfos[newRounding].getRoughEstimateDurationMillis(); + newEstimatedBucketCount = (int) Math.ceil(oldEstimatedBucketCount * ratio); + } while (newRounding < roundingInfos.length - 1 + && (newEstimatedBucketCount > targetBuckets * roundingInfos[newRounding].getMaximumInnerInterval() + || max - min > targetBuckets * roundingInfos[newRounding].getMaximumRoughEstimateDurationMillis())); + setRounding(owningBucketOrd, newRounding); + mins.set(owningBucketOrd, preparedRoundings[newRounding].round(mins.get(owningBucketOrd))); + maxes.set(owningBucketOrd, preparedRoundings[newRounding].round(maxes.get(owningBucketOrd))); + wastedBucketsOverestimate += oldEstimatedBucketCount - newEstimatedBucketCount; + if (wastedBucketsOverestimate > nextRebucketAt) { + rebucket(); + // Bump the threshold for the next rebucketing + wastedBucketsOverestimate = 0; + nextRebucketAt *= 2; + } else { + liveBucketCountUnderestimate.set(owningBucketOrd, newEstimatedBucketCount); + } + return newRounding; + } + private void rebucket() { rebucketCount++; try (LongKeyedBucketOrds oldOrds = bucketOrds) { @@ -806,6 +889,7 @@ public void collectDebugInfo(BiConsumer add) { add.accept("wasted_buckets_overestimate", wastedBucketsOverestimate); add.accept("next_rebucket_at", nextRebucketAt); add.accept("rebucket_count", rebucketCount); + add.accept("skiplist_collectors_used", skiplistCollectorCount); } private void setRounding(long owningBucketOrd, int newRounding) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index cd77a17d31815..5b71c36ed8cfd 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -39,7 +39,6 @@ import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.DocIdStream; -import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.CollectionUtil; import org.opensearch.common.Nullable; @@ -63,6 +62,7 @@ import org.opensearch.search.aggregations.StarTreeBucketCollector; import org.opensearch.search.aggregations.StarTreePreComputeCollector; import org.opensearch.search.aggregations.bucket.BucketsAggregator; +import org.opensearch.search.aggregations.bucket.HistogramSkiplistLeafCollector; import org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge; import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; @@ -114,6 +114,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg private final FilterRewriteOptimizationContext filterRewriteOptimizationContext; private final String fieldName; private final boolean fieldIndexSort; + private boolean bfsMode = false; // Collector usage tracking fields private int noOpCollectorsUsed; @@ -234,9 +235,13 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol if (skipper != null && singleton != null) { // TODO: add hard bounds support - if (hardBounds == null && parent == null) { - skipListCollectorsUsed++; - return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, sub, this); + // SkipListLeafCollector should be used if the getLeafCollector invocation is from + // filterRewriteOptimizationContext when parent != null + if (hardBounds == null) { + if (parent == null || isTryPrecomputePath()) { + skipListCollectorsUsed++; + return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, sub, this); + } } } @@ -454,149 +459,4 @@ public double bucketSize(long bucket, Rounding.DateTimeUnit unitSize) { return 1.0; } } - - private static class HistogramSkiplistLeafCollector extends LeafBucketCollector { - - private final NumericDocValues values; - private final DocValuesSkipper skipper; - private final Rounding.Prepared preparedRounding; - private final LongKeyedBucketOrds bucketOrds; - private final LeafBucketCollector sub; - private final BucketsAggregator aggregator; - - /** - * Max doc ID (inclusive) up to which all docs values may map to the same bucket. - */ - private int upToInclusive = -1; - - /** - * Whether all docs up to {@link #upToInclusive} values map to the same bucket. - */ - private boolean upToSameBucket; - - /** - * Index in bucketOrds for docs up to {@link #upToInclusive}. - */ - private long upToBucketIndex; - - HistogramSkiplistLeafCollector( - NumericDocValues values, - DocValuesSkipper skipper, - Rounding.Prepared preparedRounding, - LongKeyedBucketOrds bucketOrds, - LeafBucketCollector sub, - BucketsAggregator aggregator - ) { - this.values = values; - this.skipper = skipper; - this.preparedRounding = preparedRounding; - this.bucketOrds = bucketOrds; - this.sub = sub; - this.aggregator = aggregator; - } - - @Override - public void setScorer(Scorable scorer) throws IOException { - if (sub != null) { - sub.setScorer(scorer); - } - } - - private void advanceSkipper(int doc, long owningBucketOrd) throws IOException { - if (doc > skipper.maxDocID(0)) { - skipper.advance(doc); - } - upToSameBucket = false; - - if (skipper.minDocID(0) > doc) { - // Corner case which happens if `doc` doesn't have a value and is between two intervals of - // the doc-value skip index. - upToInclusive = skipper.minDocID(0) - 1; - return; - } - - upToInclusive = skipper.maxDocID(0); - - // Now find the highest level where all docs map to the same bucket. - for (int level = 0; level < skipper.numLevels(); ++level) { - int totalDocsAtLevel = skipper.maxDocID(level) - skipper.minDocID(level) + 1; - long minBucket = preparedRounding.round(skipper.minValue(level)); - long maxBucket = preparedRounding.round(skipper.maxValue(level)); - - if (skipper.docCount(level) == totalDocsAtLevel && minBucket == maxBucket) { - // All docs at this level have a value, and all values map to the same bucket. - upToInclusive = skipper.maxDocID(level); - upToSameBucket = true; - upToBucketIndex = bucketOrds.add(owningBucketOrd, maxBucket); - if (upToBucketIndex < 0) { - upToBucketIndex = -1 - upToBucketIndex; - } - } else { - break; - } - } - } - - @Override - public void collect(int doc, long owningBucketOrd) throws IOException { - if (doc > upToInclusive) { - advanceSkipper(doc, owningBucketOrd); - } - - if (upToSameBucket) { - aggregator.incrementBucketDocCount(upToBucketIndex, 1L); - sub.collect(doc, upToBucketIndex); - } else if (values.advanceExact(doc)) { - final long value = values.longValue(); - long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value)); - if (bucketIndex < 0) { - bucketIndex = -1 - bucketIndex; - aggregator.collectExistingBucket(sub, doc, bucketIndex); - } else { - aggregator.collectBucket(sub, doc, bucketIndex); - } - } - } - - @Override - public void collect(int doc) throws IOException { - collect(doc, 0); - } - - @Override - public void collect(DocIdStream stream) throws IOException { - // This will only be called if its the top agg - for (;;) { - int upToExclusive = upToInclusive + 1; - if (upToExclusive < 0) { // overflow - upToExclusive = Integer.MAX_VALUE; - } - - if (upToSameBucket) { - if (sub == NO_OP_COLLECTOR) { - // stream.count maybe faster when we don't need to handle sub-aggs - long count = stream.count(upToExclusive); - aggregator.incrementBucketDocCount(upToBucketIndex, count); - } else { - final int[] count = { 0 }; - stream.forEach(upToExclusive, doc -> { - sub.collect(doc, upToBucketIndex); - count[0]++; - }); - aggregator.incrementBucketDocCount(upToBucketIndex, count[0]); - } - - } else { - stream.forEach(upToExclusive, this::collect); - } - - if (stream.mayHaveRemaining()) { - advanceSkipper(upToExclusive, 0); - } else { - break; - } - } - } - - } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/AggregatorBaseTests.java b/server/src/test/java/org/opensearch/search/aggregations/AggregatorBaseTests.java index e9df26bc0c6f3..cd7c453ac660c 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/AggregatorBaseTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/AggregatorBaseTests.java @@ -207,4 +207,26 @@ public void testShortcutIsApplicable() throws IOException { ); } } + + public void testIsTryPrecomputePath() throws IOException { + SearchContext context = mockSearchContext(null); + BogusAggregator rootAgg = new BogusAggregator(context, null); + BogusAggregator childAgg = new BogusAggregator(context, rootAgg); + BogusAggregator grandchildAgg = new BogusAggregator(context, childAgg); + + assertFalse(rootAgg.isTryPrecomputePath()); + assertFalse(childAgg.isTryPrecomputePath()); + assertFalse(grandchildAgg.isTryPrecomputePath()); + + rootAgg.precomputePath = true; + assertTrue(rootAgg.isTryPrecomputePath()); + assertTrue(childAgg.isTryPrecomputePath()); + assertTrue(grandchildAgg.isTryPrecomputePath()); + + rootAgg.precomputePath = false; + childAgg.precomputePath = true; + assertFalse(rootAgg.isTryPrecomputePath()); + assertTrue(childAgg.isTryPrecomputePath()); + assertTrue(grandchildAgg.isTryPrecomputePath()); + } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java index be5add530b406..0fafef30320be 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/filterrewrite/FilterRewriteSubAggTests.java @@ -11,22 +11,33 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.LongField; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.tests.util.TestUtil; +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.fielddata.IndexNumericFieldData; import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.index.mapper.NumberFieldMapper; import org.opensearch.index.mapper.ParseContext; +import org.opensearch.search.MultiValueMode; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.AggregationBuilders; import org.opensearch.search.aggregations.Aggregator; @@ -111,6 +122,87 @@ public void testRange() throws IOException { assertEquals(3, thirdAuto.getBuckets().size()); } + /** + * Test that verifies skiplist-based collection works correctly with range aggregations + * that have date histogram sub-aggregations. + * + * This test exercises the following code paths: + * 1. HistogramSkiplistLeafCollector.collect() - skiplist-based document collection + * 2. HistogramSkiplistLeafCollector.advanceSkipper() - skiplist advancement with upToBucket logic + * 3. SubAggRangeCollector.collect() at line 87 - sub-aggregation collection path + * + * The test uses: + * - Index sort on date field to enable skiplist functionality + * - Multiple segments created via explicit commits + * - Searchable date field type + * - Documents distributed across multiple date ranges (2015-2017) + */ + public void testRangeDate() throws IOException { + // Setup index with skiplist configuration + Settings settings = getSettingsWithIndexSort(); + IndexMetadata indexMetadata = IndexMetadata.builder("index").settings(settings).build(); + IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); + + // Create searchable date field type (isSearchable=true) to enable skiplist + DateFieldMapper.DateFieldType searchableDateFieldType = aggregableDateFieldType(false, true); + + // Use custom index setup instead of setupIndex() method + try (Directory directory = newDirectory()) { + // Index documents in batches with commits to create multiple segments + indexDocsForSkiplist(directory, searchableDateFieldType); + + try (DirectoryReader indexReader = DirectoryReader.open(directory)) { + // Verify we have multiple segments (required for skiplist testing) + assertTrue("Should have multiple segments for skiplist testing", indexReader.leaves().size() > 1); + + // Create IndexSearcher with the reader + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + + // Build RangeAggregationBuilder with DateHistogramAggregationBuilder sub-aggregation + // Use YEAR interval to align with our test data structure (2015, 2016, 2017) + RangeAggregationBuilder rangeAggregationBuilder = new RangeAggregationBuilder(rangeAggName).field(longFieldName) + .addRange(1, 2) + .addRange(2, 4) + .addRange(4, 6) + .subAggregation( + new DateHistogramAggregationBuilder(dateAggName).field(dateFieldName).calendarInterval(DateHistogramInterval.YEAR) + ); + + // Execute aggregation on reader with IndexSettings (enables skiplist) + InternalRange result = executeAggregationOnReader(indexReader, rangeAggregationBuilder, indexSettings); + + // Verify results - this confirms skiplist collection worked correctly + List buckets = result.getBuckets(); + assertEquals(3, buckets.size()); + + // Range bucket 1: expect 5 docs, 1 date histogram bucket (2015) + InternalRange.Bucket firstBucket = buckets.get(0); + assertEquals(5, firstBucket.getDocCount()); + InternalDateHistogram firstDate = firstBucket.getAggregations().get(dateAggName); + assertNotNull("Sub-aggregation should be present (verifies SubAggRangeCollector.collect() was called)", firstDate); + assertEquals(1, firstDate.getBuckets().size()); + assertEquals(5, firstDate.getBuckets().get(0).getDocCount()); + + // Range bucket 2: expect 8 docs, 2 date histogram buckets (2015, 2016) + InternalRange.Bucket secondBucket = buckets.get(1); + assertEquals(8, secondBucket.getDocCount()); + InternalDateHistogram secondDate = secondBucket.getAggregations().get(dateAggName); + assertNotNull("Sub-aggregation should be present (verifies SubAggRangeCollector.collect() was called)", secondDate); + assertEquals(2, secondDate.getBuckets().size()); + assertEquals(5, secondDate.getBuckets().get(0).getDocCount()); + assertEquals(3, secondDate.getBuckets().get(1).getDocCount()); + + // Range bucket 3: expect 7 docs, 1 date histogram bucket (2017) + InternalRange.Bucket thirdBucket = buckets.get(2); + assertEquals(7, thirdBucket.getDocCount()); + InternalDateHistogram thirdDate = thirdBucket.getAggregations().get(dateAggName); + assertNotNull("Sub-aggregation should be present (verifies SubAggRangeCollector.collect() was called)", thirdDate); + assertEquals(1, thirdDate.getBuckets().size()); + assertEquals(7, thirdDate.getBuckets().get(0).getDocCount()); + } + } + } + public void testDateHisto() throws IOException { DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder(dateAggName).field( dateFieldName @@ -379,13 +471,21 @@ private Directory setupIndex(List docs, boolean random) throws IOExcept private IA executeAggregationOnReader( DirectoryReader indexReader, AggregationBuilder aggregationBuilder + ) throws IOException { + return executeAggregationOnReader(indexReader, aggregationBuilder, null); + } + + private IA executeAggregationOnReader( + DirectoryReader indexReader, + AggregationBuilder aggregationBuilder, + IndexSettings indexSettings ) throws IOException { IndexSearcher indexSearcher = new IndexSearcher(indexReader); MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = createBucketConsumer(); SearchContext searchContext = createSearchContext( indexSearcher, - createIndexSettings(), + indexSettings != null ? indexSettings : createIndexSettings(), matchAllQuery, bucketConsumer, longFieldType, @@ -473,7 +573,7 @@ private static class SubAggToVerify { protected final DateFieldMapper.DateFieldType aggregableDateFieldType(boolean useNanosecondResolution, boolean isSearchable) { return new DateFieldMapper.DateFieldType( - "timestamp", + dateFieldName, isSearchable, false, true, @@ -483,4 +583,97 @@ protected final DateFieldMapper.DateFieldType aggregableDateFieldType(boolean us Collections.emptyMap() ); } + + /** + * Helper method to create Settings with index sort on date field for skiplist testing + */ + private Settings getSettingsWithIndexSort() { + return Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .putList("index.sort.field", dateFieldName) + .build(); + } + + /** + * Helper method to index documents in batches with commits for skiplist structure + */ + private void indexDocsForSkiplist(Directory directory, DateFieldMapper.DateFieldType dateFieldType) throws IOException { + IndexWriterConfig config = new IndexWriterConfig(); + config.setMergePolicy(NoMergePolicy.INSTANCE); + + // Create sort field for index sort + IndexNumericFieldData fieldData = (IndexNumericFieldData) dateFieldType.fielddataBuilder("index", () -> { + throw new UnsupportedOperationException(); + }).build(null, null); + SortField sortField = fieldData.sortField(null, MultiValueMode.MIN, null, false); + config.setIndexSort(new Sort(sortField)); + + try (IndexWriter indexWriter = new IndexWriter(directory, config)) { + // First commit - documents for range bucket 1 (metric values 1-2) + // Dates: 2015 (5 docs with metric=1) + for (int i = 0; i < 5; i++) { + org.apache.lucene.document.Document doc = new org.apache.lucene.document.Document(); + long timestamp = asLong("2015-02-13T13:09:32Z", dateFieldType); + doc.add(SortedNumericDocValuesField.indexedField(dateFieldName, timestamp)); + doc.add(new LongPoint(dateFieldName, timestamp)); + doc.add(new NumericDocValuesField(longFieldName, 1)); + doc.add(new LongPoint(longFieldName, 1)); + indexWriter.addDocument(doc); + } + indexWriter.commit(); + + // Second commit - documents for range bucket 2 (metric values 2-4) + // Dates: 2015-2016 (5 docs with metric=2, 3 docs with metric=3) + for (int i = 0; i < 5; i++) { + org.apache.lucene.document.Document doc = new org.apache.lucene.document.Document(); + long timestamp = asLong("2015-11-13T16:14:34Z", dateFieldType); + doc.add(SortedNumericDocValuesField.indexedField(dateFieldName, timestamp)); + doc.add(new LongPoint(dateFieldName, timestamp)); + doc.add(new NumericDocValuesField(longFieldName, 2)); + doc.add(new LongPoint(longFieldName, 2)); + indexWriter.addDocument(doc); + } + for (int i = 0; i < 3; i++) { + org.apache.lucene.document.Document doc = new org.apache.lucene.document.Document(); + long timestamp = asLong("2016-03-04T17:09:50Z", dateFieldType); + doc.add(SortedNumericDocValuesField.indexedField(dateFieldName, timestamp)); + doc.add(new LongPoint(dateFieldName, timestamp)); + doc.add(new NumericDocValuesField(longFieldName, 3)); + doc.add(new LongPoint(longFieldName, 3)); + indexWriter.addDocument(doc); + } + indexWriter.commit(); + + // Third commit - documents for range bucket 3 (metric values 4-6) + // Dates: 2017 (4 docs with metric=4, 3 docs with metric=5) + for (int i = 0; i < 4; i++) { + org.apache.lucene.document.Document doc = new org.apache.lucene.document.Document(); + long timestamp = asLong("2017-12-12T22:55:46Z", dateFieldType); + doc.add(SortedNumericDocValuesField.indexedField(dateFieldName, timestamp)); + doc.add(new LongPoint(dateFieldName, timestamp)); + doc.add(new NumericDocValuesField(longFieldName, 4)); + doc.add(new LongPoint(longFieldName, 4)); + indexWriter.addDocument(doc); + } + for (int i = 0; i < 3; i++) { + org.apache.lucene.document.Document doc = new org.apache.lucene.document.Document(); + long timestamp = asLong("2017-12-12T22:55:46Z", dateFieldType); + doc.add(SortedNumericDocValuesField.indexedField(dateFieldName, timestamp)); + doc.add(new LongPoint(dateFieldName, timestamp)); + doc.add(new NumericDocValuesField(longFieldName, 5)); + doc.add(new LongPoint(longFieldName, 5)); + indexWriter.addDocument(doc); + } + indexWriter.commit(); + } + } + + /** + * Helper method to parse date strings to long values + */ + private long asLong(String dateTime, DateFieldMapper.DateFieldType fieldType) { + return fieldType.parse(dateTime); + } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java index 95f56d779b088..5eab3da2b32fe 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -95,7 +95,7 @@ import static org.hamcrest.Matchers.hasSize; public class AutoDateHistogramAggregatorTests extends DateHistogramAggregatorTestCase { - private static final String DATE_FIELD = "date"; + private static final String DATE_FIELD = "@timestamp"; private static final String INSTANT_FIELD = "instant"; private static final String NUMERIC_FIELD = "numeric"; @@ -986,10 +986,20 @@ private void testSearchCase( final List dataset, final Consumer configure, final Consumer verify + ) throws IOException { + testSearchCase(query, dataset, false, configure, verify); + } + + private void testSearchCase( + final Query query, + final List dataset, + final boolean enableSkiplist, + final Consumer configure, + final Consumer verify ) throws IOException { try (Directory directory = newDirectory()) { try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { - indexSampleData(dataset, indexWriter); + indexSampleData(dataset, indexWriter, enableSkiplist); } try (IndexReader indexReader = DirectoryReader.open(directory)) { @@ -1019,11 +1029,19 @@ private void testSearchCase( } private void indexSampleData(List dataset, RandomIndexWriter indexWriter) throws IOException { + indexSampleData(dataset, indexWriter, false); + } + + private void indexSampleData(List dataset, RandomIndexWriter indexWriter, boolean enableSkiplist) throws IOException { final Document document = new Document(); int i = 0; for (final ZonedDateTime date : dataset) { final long instant = date.toInstant().toEpochMilli(); - document.add(new SortedNumericDocValuesField(DATE_FIELD, instant)); + if (enableSkiplist) { + document.add(SortedNumericDocValuesField.indexedField(DATE_FIELD, instant)); + } else { + document.add(new SortedNumericDocValuesField(DATE_FIELD, instant)); + } document.add(new LongPoint(DATE_FIELD, instant)); document.add(new LongPoint(INSTANT_FIELD, instant)); document.add(new SortedNumericDocValuesField(NUMERIC_FIELD, i)); @@ -1052,6 +1070,151 @@ private Map maxAsMap(InternalAutoDateHistogram result) { return map; } + /** + * Test that verifies FromSingle.increaseRoundingIfNeeded() works correctly with skiplist collector. + * This test validates: + * 1. preparedRounding field update is visible to skiplist collector + * 2. New Rounding.Prepared instance is created on rounding change + * 3. owningBucketOrd is always 0 in FromSingle context + * 4. Bucket merging works correctly after rounding change + * + * Requirements: 3.1, 3.2, 3.4 + */ + public void testSkiplistCollectorWithRoundingChange() throws IOException { + // Create a dataset that will trigger rounding changes + // Start with hourly data, then add data that spans months to force rounding increase + final List dataset = new ArrayList<>(); + + // Add hourly data for first day (24 docs) + ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + for (int hour = 0; hour < 24; hour++) { + dataset.add(start.plusHours(hour)); + } + + // Add data spanning several months to trigger rounding increase (30 docs) + for (int month = 0; month < 6; month++) { + for (int day = 0; day < 5; day++) { + dataset.add(start.plusMonths(month).plusDays(day * 6)); + } + } + + testSearchCase(DEFAULT_QUERY, dataset, true, aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), histogram -> { + // Verify that aggregation completed successfully + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + + // Verify buckets were created (exact count depends on rounding chosen) + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 10); + + // Verify total doc count matches input + long totalDocs = histogram.getBuckets().stream().mapToLong(Histogram.Bucket::getDocCount).sum(); + assertEquals(54, totalDocs); // 24 + 30 = 54 docs + + // Verify buckets are in ascending order (requirement for histogram) + List buckets = histogram.getBuckets(); + for (int i = 1; i < buckets.size(); i++) { + assertTrue(((ZonedDateTime) buckets.get(i - 1).getKey()).isBefore((ZonedDateTime) buckets.get(i).getKey())); + } + }); + } + + /** + * Test that verifies skiplist collector handles rounding changes correctly with sub-aggregations. + * This ensures that when rounding changes mid-collection, sub-aggregations still produce correct results. + * + * Requirements: 3.1, 3.2, 3.4 + */ + public void testSkiplistCollectorRoundingChangeWithSubAggs() throws IOException { + // Create dataset that triggers rounding change + final List dataset = new ArrayList<>(); + ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + + // Add data spanning months to trigger rounding increase + for (int month = 0; month < 12; month++) { + for (int day = 0; day < 3; day++) { + dataset.add(start.plusMonths(month).plusDays(day * 10)); + } + } + + testSearchCase( + DEFAULT_QUERY, + dataset, + true, + aggregation -> aggregation.setNumBuckets(8) + .field(DATE_FIELD) + .subAggregation(AggregationBuilders.stats("stats").field(DATE_FIELD)), + histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + + // Verify buckets were created + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 8); + + // Verify sub-aggregations are present and valid + for (Histogram.Bucket bucket : histogram.getBuckets()) { + InternalStats stats = bucket.getAggregations().get("stats"); + assertNotNull(stats); + if (bucket.getDocCount() > 0) { + assertTrue(AggregationInspectionHelper.hasValue(stats)); + assertTrue(stats.getCount() > 0); + } + } + + // Verify total doc count + long totalDocs = histogram.getBuckets().stream().mapToLong(Histogram.Bucket::getDocCount).sum(); + assertEquals(36, totalDocs); // 12 months * 3 days = 36 docs + } + ); + } + + /** + * Test that verifies bucket merging works correctly after rounding change. + * This test creates a scenario where buckets must be merged when rounding increases. + * + * Requirements: 3.2, 3.4 + */ + public void testSkiplistCollectorBucketMergingAfterRoundingChange() throws IOException { + // Create dataset with fine-grained data that will be merged into coarser buckets + final List dataset = new ArrayList<>(); + ZonedDateTime start = ZonedDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + + // Add hourly data for multiple days, then add data spanning years + // This forces rounding to increase from hours -> days -> months -> years + for (int day = 0; day < 5; day++) { + for (int hour = 0; hour < 24; hour++) { + dataset.add(start.plusDays(day).plusHours(hour)); + } + } + + // Add data spanning multiple years to force coarse rounding + for (int year = 0; year < 5; year++) { + for (int month = 0; month < 12; month += 3) { + dataset.add(start.plusYears(year).plusMonths(month)); + } + } + + testSearchCase(DEFAULT_QUERY, dataset, true, aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), histogram -> { + assertTrue(AggregationInspectionHelper.hasValue(histogram)); + + // Verify buckets were created and merged appropriately + assertFalse(histogram.getBuckets().isEmpty()); + assertTrue(histogram.getBuckets().size() <= 5); + + // Verify total doc count is preserved after merging + long totalDocs = histogram.getBuckets().stream().mapToLong(Histogram.Bucket::getDocCount).sum(); + assertEquals(140, totalDocs); // (5 days * 24 hours) + (5 years * 4 quarters) = 120 + 20 = 140 + + Map expectedDocCount = new TreeMap<>(); + expectedDocCount.put("2020-01-01T00:00:00.000Z", 124); // 5 * 24 hours + 4 quarters + expectedDocCount.put("2021-01-01T00:00:00.000Z", 4); + expectedDocCount.put("2022-01-01T00:00:00.000Z", 4); + expectedDocCount.put("2023-01-01T00:00:00.000Z", 4); + expectedDocCount.put("2024-01-01T00:00:00.000Z", 4); + + assertThat(bucketCountsAsMap(histogram), equalTo(expectedDocCount)); + }); + } + @Override public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { /*