diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f5d58c376e83..6f6dce3b77c26 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement GRPC GeoBoundingBox, GeoDistance queries ([#19451](https://github.com/opensearch-project/OpenSearch/pull/19451)) - Implement GRPC Ids, Range, and Terms Set queries ([#19448](https://github.com/opensearch-project/OpenSearch/pull/19448)) - Implement GRPC Nested query ([#19453](https://github.com/opensearch-project/OpenSearch/pull/19453)) +- Add sub aggregation support for histogram aggregation using skiplist ([19438](https://github.com/opensearch-project/OpenSearch/pull/19438)) - Optimization in String Terms Aggregation query for Large Bucket Counts([#18732](https://github.com/opensearch-project/OpenSearch/pull/18732)) ### Changed diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index cd3317f269a02..a4e625bbfaad9 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -115,6 +115,12 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg private final String fieldName; private final boolean fieldIndexSort; + // Collector usage tracking fields + private int noOpCollectorsUsed; + private int singleValuedCollectorsUsed; + private int multiValuedCollectorsUsed; + private int skipListCollectorsUsed; + DateHistogramAggregator( String name, AggregatorFactories factories, @@ -215,6 +221,7 @@ protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws @Override public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { if (valuesSource == null) { + noOpCollectorsUsed++; return LeafBucketCollector.NO_OP_COLLECTOR; } @@ -225,17 +232,17 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol final SortedNumericDocValues values = valuesSource.longValues(ctx); final NumericDocValues singleton = DocValues.unwrapSingleton(values); - // If no subaggregations and index sorted on given field, we can use skip list based collector - logger.trace("Index sort field found: {}, skipper: {}", fieldIndexSort, skipper); - if (fieldIndexSort && skipper != null && singleton != null) { + if (skipper != null && singleton != null) { // TODO: add hard bounds support - if (hardBounds != null || sub == null || sub == LeafBucketCollector.NO_OP_COLLECTOR) { - return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, this::incrementBucketDocCount); + if (hardBounds == null && parent == null) { + skipListCollectorsUsed++; + return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, sub, this); } } if (singleton != null) { // Optimized path for single-valued fields + singleValuedCollectorsUsed++; return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { @@ -248,6 +255,7 @@ public void collect(int doc, long owningBucketOrd) throws IOException { } // Original path for multi-valued fields + multiValuedCollectorsUsed++; return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { @@ -404,8 +412,13 @@ public void doClose() { @Override public void collectDebugInfo(BiConsumer add) { + super.collectDebugInfo(add); add.accept("total_buckets", bucketOrds.size()); filterRewriteOptimizationContext.populateDebugInfo(add); + add.accept("no_op_collectors_used", noOpCollectorsUsed); + add.accept("single_valued_collectors_used", singleValuedCollectorsUsed); + add.accept("multi_valued_collectors_used", multiValuedCollectorsUsed); + add.accept("skip_list_collectors_used", skipListCollectorsUsed); } /** @@ -426,7 +439,8 @@ private static class HistogramSkiplistLeafCollector extends LeafBucketCollector private final DocValuesSkipper skipper; private final Rounding.Prepared preparedRounding; private final LongKeyedBucketOrds bucketOrds; - private final BiConsumer incrementDocCount; + private final LeafBucketCollector sub; + private final BucketsAggregator aggregator; /** * Max doc ID (inclusive) up to which all docs values may map to the same bucket. @@ -448,19 +462,25 @@ private static class HistogramSkiplistLeafCollector extends LeafBucketCollector DocValuesSkipper skipper, Rounding.Prepared preparedRounding, LongKeyedBucketOrds bucketOrds, - BiConsumer incrementDocCount + LeafBucketCollector sub, + BucketsAggregator aggregator ) { this.values = values; this.skipper = skipper; this.preparedRounding = preparedRounding; this.bucketOrds = bucketOrds; - this.incrementDocCount = incrementDocCount; + this.sub = sub; + this.aggregator = aggregator; } @Override - public void setScorer(Scorable scorer) throws IOException {} + public void setScorer(Scorable scorer) throws IOException { + if (sub != null) { + sub.setScorer(scorer); + } + } - private void advanceSkipper(int doc) throws IOException { + private void advanceSkipper(int doc, long owningBucketOrd) throws IOException { if (doc > skipper.maxDocID(0)) { skipper.advance(doc); } @@ -485,7 +505,7 @@ private void advanceSkipper(int doc) throws IOException { // All docs at this level have a value, and all values map to the same bucket. upToInclusive = skipper.maxDocID(level); upToSameBucket = true; - upToBucketIndex = bucketOrds.add(0, maxBucket); + upToBucketIndex = bucketOrds.add(owningBucketOrd, maxBucket); if (upToBucketIndex < 0) { upToBucketIndex = -1 - upToBucketIndex; } @@ -497,29 +517,33 @@ private void advanceSkipper(int doc) throws IOException { @Override public void collect(int doc, long owningBucketOrd) throws IOException { - collect(doc); - } - - @Override - public void collect(int doc) throws IOException { if (doc > upToInclusive) { - advanceSkipper(doc); + advanceSkipper(doc, owningBucketOrd); } if (upToSameBucket) { - incrementDocCount.accept(upToBucketIndex, 1L); + aggregator.incrementBucketDocCount(upToBucketIndex, 1L); + sub.collect(doc, upToBucketIndex); } else if (values.advanceExact(doc)) { final long value = values.longValue(); - long bucketIndex = bucketOrds.add(0, preparedRounding.round(value)); + long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value)); if (bucketIndex < 0) { bucketIndex = -1 - bucketIndex; + aggregator.collectExistingBucket(sub, doc, bucketIndex); + } else { + aggregator.collectBucket(sub, doc, bucketIndex); } - incrementDocCount.accept(bucketIndex, 1L); } } + @Override + public void collect(int doc) throws IOException { + collect(doc, 0); + } + @Override public void collect(DocIdStream stream) throws IOException { + // This will only be called if its the top agg for (;;) { int upToExclusive = upToInclusive + 1; if (upToExclusive < 0) { // overflow @@ -527,18 +551,30 @@ public void collect(DocIdStream stream) throws IOException { } if (upToSameBucket) { - long count = stream.count(upToExclusive); - incrementDocCount.accept(upToBucketIndex, count); + if (sub == NO_OP_COLLECTOR) { + // stream.count maybe faster when we don't need to handle sub-aggs + long count = stream.count(upToExclusive); + aggregator.incrementBucketDocCount(upToBucketIndex, count); + } else { + final int[] count = { 0 }; + stream.forEach(upToExclusive, doc -> { + sub.collect(doc, upToBucketIndex); + count[0]++; + }); + aggregator.incrementBucketDocCount(upToBucketIndex, count[0]); + } + } else { stream.forEach(upToExclusive, this::collect); } if (stream.mayHaveRemaining()) { - advanceSkipper(upToExclusive); + advanceSkipper(upToExclusive, 0); } else { break; } } } + } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 4574a75a08aca..a34a3c774212a 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -61,6 +61,7 @@ import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.index.mapper.MappedFieldType; +import org.opensearch.index.mapper.NumberFieldMapper; import org.opensearch.search.MultiValueMode; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.BucketOrder; @@ -68,6 +69,7 @@ import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.bucket.terms.StringTerms; import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.support.AggregationInspectionHelper; @@ -257,12 +259,7 @@ public void testAsSubAgg() throws IOException { public void testSkiplistWithSingleValueDates() throws IOException { // Create index settings with an index sort. - Settings settings = Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) - .putList("index.sort.field", AGGREGABLE_DATE) - .build(); + Settings settings = getSettingsWithIndexSort(); IndexMetadata indexMetadata = new IndexMetadata.Builder("index").settings(settings).build(); IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); @@ -277,44 +274,7 @@ public void testSkiplistWithSingleValueDates() throws IOException { config.setMergePolicy(NoMergePolicy.INSTANCE); config.setIndexSort(new Sort(sortField)); String filterField = "type"; - try (IndexWriter indexWriter = new IndexWriter(directory, config)) { - - // First commit - 5 dates with type 1 - for (int i = 0; i < 5; i++) { - Document doc = new Document(); - long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) - .toInstant() - .toEpochMilli(); - doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); - doc.add(new LongPoint(filterField, 1)); - indexWriter.addDocument(doc); - } - indexWriter.commit(); - - // Second commit - 3 more dates with type 2, skiplist - for (int i = 5; i < 8; i++) { - Document doc = new Document(); - long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) - .toInstant() - .toEpochMilli(); - doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); - doc.add(new LongPoint(filterField, 2)); - indexWriter.addDocument(doc); - } - indexWriter.commit(); - - // Third commit - 3 more dates with type 2 - for (int i = 8; i < 10; i++) { - Document doc = new Document(); - long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) - .toInstant() - .toEpochMilli(); - doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); - doc.add(new LongPoint(filterField, 2)); - indexWriter.addDocument(doc); - } - indexWriter.commit(); - } + indexDocsForSkiplist(directory, config, filterField, null); try (IndexReader indexReader = DirectoryReader.open(directory)) { IndexSearcher indexSearcher = newSearcher(indexReader, true, true); @@ -333,7 +293,6 @@ public void testSkiplistWithSingleValueDates() throws IOException { false, fieldType ); - assertEquals(3, histogram.getBuckets().size()); // 2015, 2016, 2017 (only type 2 docs) assertEquals("2015-01-01T00:00:00.000Z", histogram.getBuckets().get(0).getKeyAsString()); @@ -349,6 +308,150 @@ public void testSkiplistWithSingleValueDates() throws IOException { } + public void testSkiplistWithSingleValueDatesAndSubAggs() throws IOException { + // Create index settings with an index sort. + Settings settings = getSettingsWithIndexSort(); + + IndexMetadata indexMetadata = new IndexMetadata.Builder("index").settings(settings).build(); + IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); + + MappedFieldType dateType = new DateFieldMapper.DateFieldType(AGGREGABLE_DATE); + String categoryField = "category"; + NumberFieldMapper.NumberFieldType categoryType = new NumberFieldMapper.NumberFieldType( + categoryField, + NumberFieldMapper.NumberType.LONG + ); + + IndexNumericFieldData fieldData = (IndexNumericFieldData) dateType.fielddataBuilder("index", () -> { + throw new UnsupportedOperationException(); + }).build(null, null); + SortField sortField = fieldData.sortField(null, MultiValueMode.MIN, null, false); + try (Directory directory = newDirectory()) { + IndexWriterConfig config = newIndexWriterConfig(); + config.setMergePolicy(NoMergePolicy.INSTANCE); + config.setIndexSort(new Sort(sortField)); + String filterField = "type"; + indexDocsForSkiplist(directory, config, filterField, categoryField); + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + // Create date histogram with terms sub-aggregation + DateHistogramAggregationBuilder aggregationBuilder = new DateHistogramAggregationBuilder("test").field(AGGREGABLE_DATE) + .calendarInterval(DateHistogramInterval.YEAR) + .subAggregation(new MaxAggregationBuilder(categoryField).field(categoryField)); + + Query query = LongPoint.newExactQuery(filterField, 2); + + InternalDateHistogram histogram = searchAndReduce( + indexSettings, + indexSearcher, + query, + aggregationBuilder, + 1000, + false, + dateType, + categoryType + ); + + assertEquals(3, histogram.getBuckets().size()); // 2015, 2016, 2017 (only type 2 docs) + + // Verify first bucket (2015) with sub-aggregations + InternalDateHistogram.Bucket bucket2015 = (InternalDateHistogram.Bucket) histogram.getBuckets().get(0); + assertEquals("2015-01-01T00:00:00.000Z", bucket2015.getKeyAsString()); + assertEquals(3, bucket2015.getDocCount()); + + // Assert sub-aggregation values for 2015 bucket (docs 5,6,7 with categories 1,0,1) + assertNotNull("Sub-aggregation should exist for 2015 bucket", bucket2015.getAggregations()); + org.opensearch.search.aggregations.metrics.InternalMax maxAgg2015 = bucket2015.getAggregations().get(categoryField); + assertNotNull("Max sub-agg should exist", maxAgg2015); + assertEquals("Max category value for 2015 bucket should be 1", 1.0, maxAgg2015.getValue(), 0.0); + + // Verify second bucket (2016) + InternalDateHistogram.Bucket bucket2016 = (InternalDateHistogram.Bucket) histogram.getBuckets().get(1); + assertEquals("2016-01-01T00:00:00.000Z", bucket2016.getKeyAsString()); + assertEquals(1, bucket2016.getDocCount()); + + // Assert sub-aggregation values for 2016 bucket (doc 8 with category 0) + assertNotNull("Sub-aggregation should exist for 2016 bucket", bucket2016.getAggregations()); + org.opensearch.search.aggregations.metrics.InternalMax maxAgg2016 = bucket2016.getAggregations().get(categoryField); + assertNotNull("Max sub-agg should exist", maxAgg2016); + assertEquals("Max category value for 2016 bucket should be 0", 0.0, maxAgg2016.getValue(), 0.0); + + // Verify third bucket (2017) + InternalDateHistogram.Bucket bucket2017 = (InternalDateHistogram.Bucket) histogram.getBuckets().get(2); + assertEquals("2017-01-01T00:00:00.000Z", bucket2017.getKeyAsString()); + assertEquals(1, bucket2017.getDocCount()); + + // Assert sub-aggregation values for 2017 bucket (doc 9 with category 1) + assertNotNull("Sub-aggregation should exist for 2017 bucket", bucket2017.getAggregations()); + org.opensearch.search.aggregations.metrics.InternalMax maxAgg2017 = bucket2017.getAggregations().get(categoryField); + assertNotNull("Max sub-agg should exist", maxAgg2017); + assertEquals("Max category value for 2017 bucket should be 1", 1.0, maxAgg2017.getValue(), 0.0); + } + } + } + + private static void indexDocsForSkiplist(Directory directory, IndexWriterConfig config, String filterField, String categoryField) + throws IOException { + try (IndexWriter indexWriter = new IndexWriter(directory, config)) { + + // First commit - 5 dates with type 1 + for (int i = 0; i < 5; i++) { + Document doc = new Document(); + long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) + .toInstant() + .toEpochMilli(); + doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); + doc.add(new LongPoint(filterField, 1)); + if (categoryField != null) { + doc.add(new NumericDocValuesField(categoryField, i % 2)); + } + indexWriter.addDocument(doc); + } + indexWriter.commit(); + + // Second commit - 3 more dates with type 2, skiplist + for (int i = 5; i < 8; i++) { + Document doc = new Document(); + long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) + .toInstant() + .toEpochMilli(); + doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); + doc.add(new LongPoint(filterField, 2)); + if (categoryField != null) { + doc.add(new NumericDocValuesField(categoryField, i % 2)); + } + indexWriter.addDocument(doc); + } + indexWriter.commit(); + + // Third commit - 2 more dates with type 2 + for (int i = 8; i < 10; i++) { + Document doc = new Document(); + long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) + .toInstant() + .toEpochMilli(); + doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); + doc.add(new LongPoint(filterField, 2)); + if (categoryField != null) { + doc.add(new NumericDocValuesField(categoryField, i % 2)); + } + indexWriter.addDocument(doc); + } + indexWriter.commit(); + } + } + + private static Settings getSettingsWithIndexSort() { + return Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .putList("index.sort.field", AGGREGABLE_DATE) + .build(); + } + public void testNoDocsDeprecatedInterval() throws IOException { Query query = new MatchNoDocsQuery(); List dates = Collections.emptyList();