From 5d452330595a34dc601d924ed127d9b5ef5e35b1 Mon Sep 17 00:00:00 2001 From: Asim Mahmood Date: Fri, 26 Sep 2025 08:53:16 -0700 Subject: [PATCH 1/3] Handle sub aggregation with date aggregation as top Signed-off-by: Asim Mahmood --- CHANGELOG.md | 1 + .../histogram/DateHistogramAggregator.java | 47 +++--- .../DateHistogramAggregatorTests.java | 142 +++++++++++++++++- 3 files changed, 167 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 26c08c34a1a3c..172450bdbb80a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Search Stats] Add search & star-tree search query failure count metrics ([#19210](https://github.com/opensearch-project/OpenSearch/issues/19210)) - [Star-tree] Support for multi-terms aggregation ([#18398](https://github.com/opensearch-project/OpenSearch/issues/18398)) - Add stream search feature flag and auto fallback logic ([#19373](https://github.com/opensearch-project/OpenSearch/pull/19373)) +- Add sub aggregation support for histogram aggregation using skiplist ([19438](https://github.com/opensearch-project/OpenSearch/pull/19438)) ### Changed - Refactor `if-else` chains to use `Java 17 pattern matching switch expressions`(([#18965](https://github.com/opensearch-project/OpenSearch/pull/18965)) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index cd3317f269a02..74340b272f9f6 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -225,12 +225,13 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol final SortedNumericDocValues values = valuesSource.longValues(ctx); final NumericDocValues singleton = DocValues.unwrapSingleton(values); - // If no subaggregations and index sorted on given field, we can use skip list based collector + // If index sorted on given field, we can use skip list based collector logger.trace("Index sort field found: {}, skipper: {}", fieldIndexSort, skipper); if (fieldIndexSort && skipper != null && singleton != null) { // TODO: add hard bounds support - if (hardBounds != null || sub == null || sub == LeafBucketCollector.NO_OP_COLLECTOR) { - return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, this::incrementBucketDocCount); + // TODO: current assumes this is the top level + if (hardBounds == null && parent == null) { + return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, sub, this); } } @@ -426,7 +427,8 @@ private static class HistogramSkiplistLeafCollector extends LeafBucketCollector private final DocValuesSkipper skipper; private final Rounding.Prepared preparedRounding; private final LongKeyedBucketOrds bucketOrds; - private final BiConsumer incrementDocCount; + private final LeafBucketCollector sub; + private final BucketsAggregator aggregator; /** * Max doc ID (inclusive) up to which all docs values may map to the same bucket. @@ -448,17 +450,23 @@ private static class HistogramSkiplistLeafCollector extends LeafBucketCollector DocValuesSkipper skipper, Rounding.Prepared preparedRounding, LongKeyedBucketOrds bucketOrds, - BiConsumer incrementDocCount + LeafBucketCollector sub, + BucketsAggregator aggregator ) { this.values = values; this.skipper = skipper; this.preparedRounding = preparedRounding; this.bucketOrds = bucketOrds; - this.incrementDocCount = incrementDocCount; + this.sub = sub; + this.aggregator = aggregator; } @Override - public void setScorer(Scorable scorer) throws IOException {} + public void setScorer(Scorable scorer) throws IOException { + if (sub != null) { + sub.setScorer(scorer); + } + } private void advanceSkipper(int doc) throws IOException { if (doc > skipper.maxDocID(0)) { @@ -485,6 +493,7 @@ private void advanceSkipper(int doc) throws IOException { // All docs at this level have a value, and all values map to the same bucket. upToInclusive = skipper.maxDocID(level); upToSameBucket = true; + // Use owningBucketOrd = 0 for top-level aggregation upToBucketIndex = bucketOrds.add(0, maxBucket); if (upToBucketIndex < 0) { upToBucketIndex = -1 - upToBucketIndex; @@ -497,27 +506,30 @@ private void advanceSkipper(int doc) throws IOException { @Override public void collect(int doc, long owningBucketOrd) throws IOException { - collect(doc); - } - - @Override - public void collect(int doc) throws IOException { if (doc > upToInclusive) { advanceSkipper(doc); } if (upToSameBucket) { - incrementDocCount.accept(upToBucketIndex, 1L); + aggregator.incrementBucketDocCount(upToBucketIndex, 1L); + sub.collect(doc, upToBucketIndex); } else if (values.advanceExact(doc)) { final long value = values.longValue(); - long bucketIndex = bucketOrds.add(0, preparedRounding.round(value)); + long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value)); if (bucketIndex < 0) { bucketIndex = -1 - bucketIndex; + aggregator.collectExistingBucket(sub, doc, bucketIndex); + } else { + aggregator.collectBucket(sub, doc, bucketIndex); } - incrementDocCount.accept(bucketIndex, 1L); } } + @Override + public void collect(int doc) throws IOException { + collect(doc, 0); + } + @Override public void collect(DocIdStream stream) throws IOException { for (;;) { @@ -526,9 +538,9 @@ public void collect(DocIdStream stream) throws IOException { upToExclusive = Integer.MAX_VALUE; } - if (upToSameBucket) { + if (upToSameBucket && sub == LeafBucketCollector.NO_OP_COLLECTOR) { long count = stream.count(upToExclusive); - incrementDocCount.accept(upToBucketIndex, count); + aggregator.incrementBucketDocCount(upToBucketIndex, count); } else { stream.forEach(upToExclusive, this::collect); } @@ -540,5 +552,6 @@ public void collect(DocIdStream stream) throws IOException { } } } + } } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 4574a75a08aca..19a907b2e5515 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -61,6 +61,7 @@ import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.index.mapper.DocCountFieldMapper; import org.opensearch.index.mapper.MappedFieldType; +import org.opensearch.index.mapper.NumberFieldMapper; import org.opensearch.search.MultiValueMode; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.BucketOrder; @@ -68,6 +69,7 @@ import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.bucket.terms.StringTerms; import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.support.AggregationInspectionHelper; @@ -257,12 +259,7 @@ public void testAsSubAgg() throws IOException { public void testSkiplistWithSingleValueDates() throws IOException { // Create index settings with an index sort. - Settings settings = Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) - .putList("index.sort.field", AGGREGABLE_DATE) - .build(); + Settings settings = getSettingsWithIndexSort(); IndexMetadata indexMetadata = new IndexMetadata.Builder("index").settings(settings).build(); IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); @@ -349,6 +346,139 @@ public void testSkiplistWithSingleValueDates() throws IOException { } + public void testSkiplistWithSingleValueDatesAndSubAggs() throws IOException { + // Create index settings with an index sort. + Settings settings = getSettingsWithIndexSort(); + + IndexMetadata indexMetadata = new IndexMetadata.Builder("index").settings(settings).build(); + IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); + + MappedFieldType dateType = new DateFieldMapper.DateFieldType(AGGREGABLE_DATE); + String categoryField = "category"; + NumberFieldMapper.NumberFieldType categoryType = new NumberFieldMapper.NumberFieldType( + categoryField, + NumberFieldMapper.NumberType.LONG + ); + + IndexNumericFieldData fieldData = (IndexNumericFieldData) dateType.fielddataBuilder("index", () -> { + throw new UnsupportedOperationException(); + }).build(null, null); + SortField sortField = fieldData.sortField(null, MultiValueMode.MIN, null, false); + try (Directory directory = newDirectory()) { + IndexWriterConfig config = newIndexWriterConfig(); + config.setMergePolicy(NoMergePolicy.INSTANCE); + config.setIndexSort(new Sort(sortField)); + String filterField = "type"; + try (IndexWriter indexWriter = new IndexWriter(directory, config)) { + + // First commit - 5 dates with type 1 + for (int i = 0; i < 5; i++) { + Document doc = new Document(); + long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) + .toInstant() + .toEpochMilli(); + doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); + doc.add(new LongPoint(filterField, 1)); + doc.add(new NumericDocValuesField(categoryField, i % 2)); // alternating categories + indexWriter.addDocument(doc); + } + indexWriter.commit(); + + // Second commit - 3 more dates with type 2, skiplist + for (int i = 5; i < 8; i++) { + Document doc = new Document(); + long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) + .toInstant() + .toEpochMilli(); + doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); + doc.add(new LongPoint(filterField, 2)); + doc.add(new NumericDocValuesField(categoryField, i % 2)); + indexWriter.addDocument(doc); + } + indexWriter.commit(); + + // Third commit - 2 more dates with type 2 + for (int i = 8; i < 10; i++) { + Document doc = new Document(); + long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) + .toInstant() + .toEpochMilli(); + doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); + doc.add(new LongPoint(filterField, 2)); + doc.add(new NumericDocValuesField(categoryField, i % 2)); + indexWriter.addDocument(doc); + } + indexWriter.commit(); + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + // Create date histogram with terms sub-aggregation + DateHistogramAggregationBuilder aggregationBuilder = new DateHistogramAggregationBuilder("test").field(AGGREGABLE_DATE) + .calendarInterval(DateHistogramInterval.YEAR) + .subAggregation(new MaxAggregationBuilder(categoryField).field(categoryField)); + + Query query = LongPoint.newExactQuery(filterField, 2); + + InternalDateHistogram histogram = searchAndReduce( + indexSettings, + indexSearcher, + query, + aggregationBuilder, + 1000, + false, + dateType, + categoryType + ); + + assertEquals(3, histogram.getBuckets().size()); // 2015, 2016, 2017 (only type 2 docs) + + // Verify first bucket (2015) with sub-aggregations + InternalDateHistogram.Bucket bucket2015 = (InternalDateHistogram.Bucket) histogram.getBuckets().get(0); + assertEquals("2015-01-01T00:00:00.000Z", bucket2015.getKeyAsString()); + assertEquals(3, bucket2015.getDocCount()); + + // Assert sub-aggregation values for 2015 bucket (docs 5,6,7 with categories 1,0,1) + assertNotNull("Sub-aggregation should exist for 2015 bucket", bucket2015.getAggregations()); + org.opensearch.search.aggregations.metrics.InternalMax maxAgg2015 = bucket2015.getAggregations().get(categoryField); + assertNotNull("Max sub-agg should exist", maxAgg2015); + assertEquals("Max category value for 2015 bucket should be 1", 1.0, maxAgg2015.getValue(), 0.0); + + // Verify second bucket (2016) + InternalDateHistogram.Bucket bucket2016 = (InternalDateHistogram.Bucket) histogram.getBuckets().get(1); + assertEquals("2016-01-01T00:00:00.000Z", bucket2016.getKeyAsString()); + assertEquals(1, bucket2016.getDocCount()); + + // Assert sub-aggregation values for 2016 bucket (doc 8 with category 0) + assertNotNull("Sub-aggregation should exist for 2016 bucket", bucket2016.getAggregations()); + org.opensearch.search.aggregations.metrics.InternalMax maxAgg2016 = bucket2016.getAggregations().get(categoryField); + assertNotNull("Max sub-agg should exist", maxAgg2016); + assertEquals("Max category value for 2016 bucket should be 0", 0.0, maxAgg2016.getValue(), 0.0); + + // Verify third bucket (2017) + InternalDateHistogram.Bucket bucket2017 = (InternalDateHistogram.Bucket) histogram.getBuckets().get(2); + assertEquals("2017-01-01T00:00:00.000Z", bucket2017.getKeyAsString()); + assertEquals(1, bucket2017.getDocCount()); + + // Assert sub-aggregation values for 2017 bucket (doc 9 with category 1) + assertNotNull("Sub-aggregation should exist for 2017 bucket", bucket2017.getAggregations()); + org.opensearch.search.aggregations.metrics.InternalMax maxAgg2017 = bucket2017.getAggregations().get(categoryField); + assertNotNull("Max sub-agg should exist", maxAgg2017); + assertEquals("Max category value for 2017 bucket should be 1", 1.0, maxAgg2017.getValue(), 0.0); + } + } + } + + private static Settings getSettingsWithIndexSort() { + return Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .putList("index.sort.field", AGGREGABLE_DATE) + .build(); + } + public void testNoDocsDeprecatedInterval() throws IOException { Query query = new MatchNoDocsQuery(); List dates = Collections.emptyList(); From d0c2eac4f11ce3e9837377751e381c546185244e Mon Sep 17 00:00:00 2001 From: Asim Mahmood Date: Tue, 30 Sep 2025 11:04:31 -0700 Subject: [PATCH 2/3] Fix sub agg Signed-off-by: Asim Mahmood --- .../histogram/DateHistogramAggregator.java | 33 +++-- .../DateHistogramAggregatorTests.java | 134 +++++++----------- 2 files changed, 75 insertions(+), 92 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 74340b272f9f6..200c7d3d479a8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -225,11 +225,8 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol final SortedNumericDocValues values = valuesSource.longValues(ctx); final NumericDocValues singleton = DocValues.unwrapSingleton(values); - // If index sorted on given field, we can use skip list based collector - logger.trace("Index sort field found: {}, skipper: {}", fieldIndexSort, skipper); - if (fieldIndexSort && skipper != null && singleton != null) { + if (skipper != null && singleton != null) { // TODO: add hard bounds support - // TODO: current assumes this is the top level if (hardBounds == null && parent == null) { return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, sub, this); } @@ -407,6 +404,7 @@ public void doClose() { public void collectDebugInfo(BiConsumer add) { add.accept("total_buckets", bucketOrds.size()); filterRewriteOptimizationContext.populateDebugInfo(add); + // TODO: add skiplist, single and multi-value usage } /** @@ -468,7 +466,7 @@ public void setScorer(Scorable scorer) throws IOException { } } - private void advanceSkipper(int doc) throws IOException { + private void advanceSkipper(int doc, long owningBucketOrd) throws IOException { if (doc > skipper.maxDocID(0)) { skipper.advance(doc); } @@ -493,8 +491,7 @@ private void advanceSkipper(int doc) throws IOException { // All docs at this level have a value, and all values map to the same bucket. upToInclusive = skipper.maxDocID(level); upToSameBucket = true; - // Use owningBucketOrd = 0 for top-level aggregation - upToBucketIndex = bucketOrds.add(0, maxBucket); + upToBucketIndex = bucketOrds.add(owningBucketOrd, maxBucket); if (upToBucketIndex < 0) { upToBucketIndex = -1 - upToBucketIndex; } @@ -507,7 +504,7 @@ private void advanceSkipper(int doc) throws IOException { @Override public void collect(int doc, long owningBucketOrd) throws IOException { if (doc > upToInclusive) { - advanceSkipper(doc); + advanceSkipper(doc, owningBucketOrd); } if (upToSameBucket) { @@ -532,21 +529,33 @@ public void collect(int doc) throws IOException { @Override public void collect(DocIdStream stream) throws IOException { + // This will only be called if its the top agg for (;;) { int upToExclusive = upToInclusive + 1; if (upToExclusive < 0) { // overflow upToExclusive = Integer.MAX_VALUE; } - if (upToSameBucket && sub == LeafBucketCollector.NO_OP_COLLECTOR) { - long count = stream.count(upToExclusive); - aggregator.incrementBucketDocCount(upToBucketIndex, count); + if (upToSameBucket) { + if (sub == NO_OP_COLLECTOR) { + // stream.count maybe faster when we don't need to handle sub-aggs + long count = stream.count(upToExclusive); + aggregator.incrementBucketDocCount(upToBucketIndex, count); + } else { + final int[] count = { 0 }; + stream.forEach(upToExclusive, doc -> { + sub.collect(doc, upToBucketIndex); + count[0]++; + }); + aggregator.incrementBucketDocCount(upToBucketIndex, count[0]); + } + } else { stream.forEach(upToExclusive, this::collect); } if (stream.mayHaveRemaining()) { - advanceSkipper(upToExclusive); + advanceSkipper(upToExclusive, 0); } else { break; } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 19a907b2e5515..2e06f9cece637 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -274,44 +274,7 @@ public void testSkiplistWithSingleValueDates() throws IOException { config.setMergePolicy(NoMergePolicy.INSTANCE); config.setIndexSort(new Sort(sortField)); String filterField = "type"; - try (IndexWriter indexWriter = new IndexWriter(directory, config)) { - - // First commit - 5 dates with type 1 - for (int i = 0; i < 5; i++) { - Document doc = new Document(); - long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) - .toInstant() - .toEpochMilli(); - doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); - doc.add(new LongPoint(filterField, 1)); - indexWriter.addDocument(doc); - } - indexWriter.commit(); - - // Second commit - 3 more dates with type 2, skiplist - for (int i = 5; i < 8; i++) { - Document doc = new Document(); - long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) - .toInstant() - .toEpochMilli(); - doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); - doc.add(new LongPoint(filterField, 2)); - indexWriter.addDocument(doc); - } - indexWriter.commit(); - - // Third commit - 3 more dates with type 2 - for (int i = 8; i < 10; i++) { - Document doc = new Document(); - long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) - .toInstant() - .toEpochMilli(); - doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); - doc.add(new LongPoint(filterField, 2)); - indexWriter.addDocument(doc); - } - indexWriter.commit(); - } + indexDocsForSkiplist(directory, config, filterField, null); try (IndexReader indexReader = DirectoryReader.open(directory)) { IndexSearcher indexSearcher = newSearcher(indexReader, true, true); @@ -330,7 +293,7 @@ public void testSkiplistWithSingleValueDates() throws IOException { false, fieldType ); - + System.out.println(histogram.getBuckets()); assertEquals(3, histogram.getBuckets().size()); // 2015, 2016, 2017 (only type 2 docs) assertEquals("2015-01-01T00:00:00.000Z", histogram.getBuckets().get(0).getKeyAsString()); @@ -369,47 +332,7 @@ public void testSkiplistWithSingleValueDatesAndSubAggs() throws IOException { config.setMergePolicy(NoMergePolicy.INSTANCE); config.setIndexSort(new Sort(sortField)); String filterField = "type"; - try (IndexWriter indexWriter = new IndexWriter(directory, config)) { - - // First commit - 5 dates with type 1 - for (int i = 0; i < 5; i++) { - Document doc = new Document(); - long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) - .toInstant() - .toEpochMilli(); - doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); - doc.add(new LongPoint(filterField, 1)); - doc.add(new NumericDocValuesField(categoryField, i % 2)); // alternating categories - indexWriter.addDocument(doc); - } - indexWriter.commit(); - - // Second commit - 3 more dates with type 2, skiplist - for (int i = 5; i < 8; i++) { - Document doc = new Document(); - long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) - .toInstant() - .toEpochMilli(); - doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); - doc.add(new LongPoint(filterField, 2)); - doc.add(new NumericDocValuesField(categoryField, i % 2)); - indexWriter.addDocument(doc); - } - indexWriter.commit(); - - // Third commit - 2 more dates with type 2 - for (int i = 8; i < 10; i++) { - Document doc = new Document(); - long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) - .toInstant() - .toEpochMilli(); - doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); - doc.add(new LongPoint(filterField, 2)); - doc.add(new NumericDocValuesField(categoryField, i % 2)); - indexWriter.addDocument(doc); - } - indexWriter.commit(); - } + indexDocsForSkiplist(directory, config, filterField, categoryField); try (IndexReader indexReader = DirectoryReader.open(directory)) { IndexSearcher indexSearcher = newSearcher(indexReader, true, true); @@ -470,6 +393,57 @@ public void testSkiplistWithSingleValueDatesAndSubAggs() throws IOException { } } + private static void indexDocsForSkiplist(Directory directory, IndexWriterConfig config, String filterField, String categoryField) + throws IOException { + try (IndexWriter indexWriter = new IndexWriter(directory, config)) { + + // First commit - 5 dates with type 1 + for (int i = 0; i < 5; i++) { + Document doc = new Document(); + long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) + .toInstant() + .toEpochMilli(); + doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); + doc.add(new LongPoint(filterField, 1)); + if (categoryField != null) { + doc.add(new NumericDocValuesField(categoryField, i % 2)); + } + indexWriter.addDocument(doc); + } + indexWriter.commit(); + + // Second commit - 3 more dates with type 2, skiplist + for (int i = 5; i < 8; i++) { + Document doc = new Document(); + long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) + .toInstant() + .toEpochMilli(); + doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); + doc.add(new LongPoint(filterField, 2)); + if (categoryField != null) { + doc.add(new NumericDocValuesField(categoryField, i % 2)); + } + indexWriter.addDocument(doc); + } + indexWriter.commit(); + + // Third commit - 2 more dates with type 2 + for (int i = 8; i < 10; i++) { + Document doc = new Document(); + long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i))) + .toInstant() + .toEpochMilli(); + doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp)); + doc.add(new LongPoint(filterField, 2)); + if (categoryField != null) { + doc.add(new NumericDocValuesField(categoryField, i % 2)); + } + indexWriter.addDocument(doc); + } + indexWriter.commit(); + } + } + private static Settings getSettingsWithIndexSort() { return Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) From f192a4ded663915a53f119cd0f91bb5965ac62b3 Mon Sep 17 00:00:00 2001 From: Asim Mahmood Date: Tue, 30 Sep 2025 11:59:48 -0700 Subject: [PATCH 3/3] Add debug collector usage Signed-off-by: Asim Mahmood --- .../histogram/DateHistogramAggregator.java | 16 +++++++++++++++- .../histogram/DateHistogramAggregatorTests.java | 1 - 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 200c7d3d479a8..a4e625bbfaad9 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -115,6 +115,12 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg private final String fieldName; private final boolean fieldIndexSort; + // Collector usage tracking fields + private int noOpCollectorsUsed; + private int singleValuedCollectorsUsed; + private int multiValuedCollectorsUsed; + private int skipListCollectorsUsed; + DateHistogramAggregator( String name, AggregatorFactories factories, @@ -215,6 +221,7 @@ protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws @Override public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { if (valuesSource == null) { + noOpCollectorsUsed++; return LeafBucketCollector.NO_OP_COLLECTOR; } @@ -228,12 +235,14 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol if (skipper != null && singleton != null) { // TODO: add hard bounds support if (hardBounds == null && parent == null) { + skipListCollectorsUsed++; return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, sub, this); } } if (singleton != null) { // Optimized path for single-valued fields + singleValuedCollectorsUsed++; return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { @@ -246,6 +255,7 @@ public void collect(int doc, long owningBucketOrd) throws IOException { } // Original path for multi-valued fields + multiValuedCollectorsUsed++; return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long owningBucketOrd) throws IOException { @@ -402,9 +412,13 @@ public void doClose() { @Override public void collectDebugInfo(BiConsumer add) { + super.collectDebugInfo(add); add.accept("total_buckets", bucketOrds.size()); filterRewriteOptimizationContext.populateDebugInfo(add); - // TODO: add skiplist, single and multi-value usage + add.accept("no_op_collectors_used", noOpCollectorsUsed); + add.accept("single_valued_collectors_used", singleValuedCollectorsUsed); + add.accept("multi_valued_collectors_used", multiValuedCollectorsUsed); + add.accept("skip_list_collectors_used", skipListCollectorsUsed); } /** diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 2e06f9cece637..a34a3c774212a 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -293,7 +293,6 @@ public void testSkiplistWithSingleValueDates() throws IOException { false, fieldType ); - System.out.println(histogram.getBuckets()); assertEquals(3, histogram.getBuckets().size()); // 2015, 2016, 2017 (only type 2 docs) assertEquals("2015-01-01T00:00:00.000Z", histogram.getBuckets().get(0).getKeyAsString());