diff --git a/docs/changelog/137023.yaml b/docs/changelog/137023.yaml new file mode 100644 index 0000000000000..cab7620233562 --- /dev/null +++ b/docs/changelog/137023.yaml @@ -0,0 +1,5 @@ +pr: 137023 +summary: Support choosing the downsampling method in data stream lifecycle +area: "Data streams" +type: enhancement +issues: [] diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index 776ee03928b27..5a998f43d0103 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -524,7 +524,10 @@ Set maybeExecuteDownsampling(ProjectState projectState, DataStream dataSt // - has matching downsample rounds // - is read-only // So let's wait for an in-progress downsampling operation to succeed or trigger the last matching round - affectedIndices.addAll(waitForInProgressOrTriggerDownsampling(dataStream, backingIndexMeta, downsamplingRounds, project)); + var downsamplingMethod = dataStream.getDataLifecycle().downsamplingMethod(); + affectedIndices.addAll( + waitForInProgressOrTriggerDownsampling(dataStream, backingIndexMeta, downsamplingRounds, downsamplingMethod, project) + ); } } @@ -541,6 +544,7 @@ private Set waitForInProgressOrTriggerDownsampling( DataStream dataStream, IndexMetadata backingIndex, List downsamplingRounds, + DownsampleConfig.SamplingMethod downsamplingMethod, ProjectMetadata project ) { assert dataStream.getIndices().contains(backingIndex.getIndex()) @@ -556,7 +560,7 @@ private Set waitForInProgressOrTriggerDownsampling( String downsampleIndexName = DownsampleConfig.generateDownsampleIndexName( DOWNSAMPLED_INDEX_PREFIX, backingIndex, - round.config().getFixedInterval() + round.fixedInterval() ); IndexMetadata targetDownsampleIndexMeta = project.index(downsampleIndexName); boolean targetDownsampleIndexExists = targetDownsampleIndexMeta != null; @@ -568,7 +572,8 @@ private Set waitForInProgressOrTriggerDownsampling( INDEX_DOWNSAMPLE_STATUS.get(targetDownsampleIndexMeta.getSettings()), round, lastRound, - index, + downsamplingMethod, + backingIndex, targetDownsampleIndexMeta.getIndex() ); if (downsamplingNotComplete.isEmpty() == false) { @@ -580,7 +585,7 @@ private Set waitForInProgressOrTriggerDownsampling( // no maintenance needed for previously started downsampling actions and we are on the last matching round so it's time // to kick off downsampling affectedIndices.add(index); - downsampleIndexOnce(round, project.id(), indexName, downsampleIndexName); + downsampleIndexOnce(round, downsamplingMethod, project.id(), backingIndex, downsampleIndexName); } } } @@ -592,16 +597,30 @@ private Set waitForInProgressOrTriggerDownsampling( */ private void downsampleIndexOnce( DataStreamLifecycle.DownsamplingRound round, + DownsampleConfig.SamplingMethod requestedDownsamplingMethod, ProjectId projectId, - String sourceIndex, + IndexMetadata sourceIndexMetadata, String downsampleIndexName ) { + // When an index is already downsampled with a method, we require all later downsampling rounds to use the same method. + // This is necessary to preserve the relation of the downsampled index to the raw data. For example, if an index is already + // downsampled and downsampled it again to 1 hour; we know that a document represents either the aggregated raw data of an hour + // or the last value of the raw data within this hour. If we mix the methods, we cannot derive any meaning from them. + // Furthermore, data stream lifecycle is configured on the data stream level and not on the individual index level, meaning that + // when a user changes downsampling method, some indices would not be able to be downsampled anymore. + // For this reason, when we encounter an already downsampled index, we use the source downsampling method which might be different + // from the requested one. + var sourceIndexSamplingMethod = DownsampleConfig.SamplingMethod.fromIndexMetadata(sourceIndexMetadata); + String sourceIndex = sourceIndexMetadata.getIndex().getName(); DownsampleAction.Request request = new DownsampleAction.Request( TimeValue.THIRTY_SECONDS /* TODO should this be longer/configurable? */, sourceIndex, downsampleIndexName, null, - round.config() + new DownsampleConfig( + round.fixedInterval(), + sourceIndexSamplingMethod == null ? requestedDownsamplingMethod : sourceIndexSamplingMethod + ) ); transportActionsDeduplicator.executeOnce( Tuple.tuple(projectId, request), @@ -632,11 +651,12 @@ private Set evaluateDownsampleStatus( IndexMetadata.DownsampleTaskStatus downsampleStatus, DataStreamLifecycle.DownsamplingRound currentRound, DataStreamLifecycle.DownsamplingRound lastRound, - Index backingIndex, + DownsampleConfig.SamplingMethod downsamplingMethod, + IndexMetadata backingIndex, Index downsampleIndex ) { Set affectedIndices = new HashSet<>(); - String indexName = backingIndex.getName(); + String indexName = backingIndex.getIndex().getName(); String downsampleIndexName = downsampleIndex.getName(); return switch (downsampleStatus) { case UNKNOWN -> { @@ -683,15 +703,15 @@ private Set evaluateDownsampleStatus( // NOTE that the downsample request is made through the deduplicator so it will only really be executed if // there isn't one already in-flight. This can happen if a previous request timed-out, failed, or there was a // master failover and data stream lifecycle needed to restart - downsampleIndexOnce(currentRound, projectId, indexName, downsampleIndexName); - affectedIndices.add(backingIndex); + downsampleIndexOnce(currentRound, downsamplingMethod, projectId, backingIndex, downsampleIndexName); + affectedIndices.add(backingIndex.getIndex()); yield affectedIndices; } case SUCCESS -> { if (dataStream.getIndices().contains(downsampleIndex) == false) { // at this point the source index is part of the data stream and the downsample index is complete but not // part of the data stream. we need to replace the source index with the downsample index in the data stream - affectedIndices.add(backingIndex); + affectedIndices.add(backingIndex.getIndex()); replaceBackingIndexWithDownsampleIndexOnce(projectId, dataStream, indexName, downsampleIndexName); } yield affectedIndices; diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestPutDataStreamLifecycleAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestPutDataStreamLifecycleAction.java index 2d975845fcd27..0d1cd1be1d95e 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestPutDataStreamLifecycleAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestPutDataStreamLifecycleAction.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; +import java.util.Set; import static org.elasticsearch.rest.RestRequest.Method.PUT; import static org.elasticsearch.rest.RestUtils.getAckTimeout; @@ -29,6 +30,9 @@ @ServerlessScope(Scope.PUBLIC) public class RestPutDataStreamLifecycleAction extends BaseRestHandler { + private static final String SUPPORTS_DOWNSAMPLING_METHOD = "dlm.downsampling_method"; + private static final Set CAPABILITIES = Set.of(SUPPORTS_DOWNSAMPLING_METHOD); + @Override public String getName() { return "put_data_lifecycles_action"; @@ -44,13 +48,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli try (XContentParser parser = request.contentParser()) { PutDataStreamLifecycleAction.Request putLifecycleRequest = PutDataStreamLifecycleAction.Request.parseRequest( parser, - (dataRetention, enabled, downsampling) -> new PutDataStreamLifecycleAction.Request( + (dataRetention, enabled, downsamplingRounds, downsamplingMethod) -> new PutDataStreamLifecycleAction.Request( getMasterNodeTimeout(request), getAckTimeout(request), Strings.splitStringByCommaToArray(request.param("name")), dataRetention, enabled, - downsampling + downsamplingRounds, + downsamplingMethod ) ); putLifecycleRequest.indicesOptions(IndicesOptions.fromRequest(request, putLifecycleRequest.indicesOptions())); @@ -61,4 +66,9 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli ); } } + + @Override + public Set supportedCapabilities() { + return CAPABILITIES; + } } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataIndexTemplateServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataIndexTemplateServiceTests.java index fd29a2087db86..574c30376137c 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataIndexTemplateServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataIndexTemplateServiceTests.java @@ -43,6 +43,8 @@ import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.composeDataLifecycles; import static org.elasticsearch.common.settings.Settings.builder; import static org.elasticsearch.datastreams.MetadataDataStreamRolloverServiceTests.createSettingsProvider; +import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleFixtures.randomResettable; +import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleFixtures.randomSamplingMethod; import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -146,51 +148,63 @@ public void testLifecycleComposition() { List lifecycles = List.of(); assertThat(composeDataLifecycles(lifecycles), nullValue()); } - // One lifecycle results to this lifecycle as the final + // One lifecycle results in this lifecycle as the final { + ResettableValue> downsamplingRounds = randomDownsampling(); DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.createDataLifecycleTemplate( true, randomRetention(), - randomDownsampling() + downsamplingRounds, + randomResettable(() -> randomSamplingMethod(downsamplingRounds.get())) ); List lifecycles = List.of(lifecycle); DataStreamLifecycle result = composeDataLifecycles(lifecycles).build(); // Defaults to true assertThat(result.enabled(), equalTo(true)); assertThat(result.dataRetention(), equalTo(lifecycle.dataRetention().get())); - assertThat(result.downsampling(), equalTo(lifecycle.downsampling().get())); + assertThat(result.downsamplingRounds(), equalTo(lifecycle.downsamplingRounds().get())); + assertThat(result.downsamplingMethod(), equalTo(lifecycle.downsamplingMethod().get())); } // If the last lifecycle is missing a property (apart from enabled) we keep the latest from the previous ones // Enabled is always true unless it's explicitly set to false { + List downsamplingRounds = randomRounds(); DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.createDataLifecycleTemplate( false, randomPositiveTimeValue(), - randomRounds() + downsamplingRounds, + randomSamplingMethod(downsamplingRounds) ); List lifecycles = List.of(lifecycle, DataStreamLifecycle.Template.DATA_DEFAULT); DataStreamLifecycle result = composeDataLifecycles(lifecycles).build(); assertThat(result.enabled(), equalTo(true)); assertThat(result.dataRetention(), equalTo(lifecycle.dataRetention().get())); - assertThat(result.downsampling(), equalTo(lifecycle.downsampling().get())); + assertThat(result.downsamplingRounds(), equalTo(lifecycle.downsamplingRounds().get())); + assertThat(result.downsamplingMethod(), equalTo(lifecycle.downsamplingMethod().get())); } - // If both lifecycle have all properties, then the latest one overwrites all the others + // If both lifecycles have all properties, then the latest one overwrites all the others { + DownsampleConfig.SamplingMethod downsamplingMethod1 = randomFrom(DownsampleConfig.SamplingMethod.LAST_VALUE); DataStreamLifecycle.Template lifecycle1 = DataStreamLifecycle.createDataLifecycleTemplate( false, randomPositiveTimeValue(), - randomRounds() + randomRounds(), + downsamplingMethod1 ); DataStreamLifecycle.Template lifecycle2 = DataStreamLifecycle.createDataLifecycleTemplate( true, randomPositiveTimeValue(), - randomRounds() + randomRounds(), + downsamplingMethod1 == DownsampleConfig.SamplingMethod.LAST_VALUE + ? DownsampleConfig.SamplingMethod.AGGREGATE + : DownsampleConfig.SamplingMethod.LAST_VALUE ); List lifecycles = List.of(lifecycle1, lifecycle2); DataStreamLifecycle result = composeDataLifecycles(lifecycles).build(); assertThat(result.enabled(), equalTo(lifecycle2.enabled())); assertThat(result.dataRetention(), equalTo(lifecycle2.dataRetention().get())); - assertThat(result.downsampling(), equalTo(lifecycle2.downsampling().get())); + assertThat(result.downsamplingRounds(), equalTo(lifecycle2.downsamplingRounds().get())); + assertThat(result.downsamplingMethod(), equalTo(lifecycle2.downsamplingMethod().get())); } } @@ -255,7 +269,7 @@ private static List randomRounds() { List rounds = new ArrayList<>(); var previous = new DataStreamLifecycle.DownsamplingRound( TimeValue.timeValueDays(randomIntBetween(1, 365)), - new DownsampleConfig(new DateHistogramInterval(randomIntBetween(1, 24) + "h")) + new DateHistogramInterval(randomIntBetween(1, 24) + "h") ); rounds.add(previous); for (int i = 0; i < count; i++) { @@ -268,9 +282,7 @@ private static List randomRounds() { private static DataStreamLifecycle.DownsamplingRound nextRound(DataStreamLifecycle.DownsamplingRound previous) { var after = TimeValue.timeValueDays(previous.after().days() + randomIntBetween(1, 10)); - var fixedInterval = new DownsampleConfig( - new DateHistogramInterval((previous.config().getFixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms") - ); + var fixedInterval = new DateHistogramInterval((previous.fixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms"); return new DataStreamLifecycle.DownsamplingRound(after, fixedInterval); } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java index 8f6605cefae9f..10b772249e436 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java @@ -154,7 +154,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti .setGeneration(3) .setAllowCustomRouting(true) .setIndexMode(IndexMode.STANDARD) - .setLifecycle(DataStreamLifecycle.createDataLifecycle(false, null, null)) + .setLifecycle(DataStreamLifecycle.createDataLifecycle(false, null, null, null)) .setDataStreamOptions(DataStreamOptions.FAILURE_STORE_ENABLED) .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build()) .build(); diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleFixtures.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleFixtures.java index 163bff3dfd2cb..c0625e34f3153 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleFixtures.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleFixtures.java @@ -38,7 +38,9 @@ import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance; import static org.elasticsearch.test.ESIntegTestCase.client; +import static org.elasticsearch.test.ESTestCase.between; import static org.elasticsearch.test.ESTestCase.frequently; +import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.ESTestCase.randomIntBetween; import static org.junit.Assert.assertTrue; @@ -143,14 +145,18 @@ static void putComposableIndexTemplate( } static DataStreamLifecycle.Template randomDataLifecycleTemplate() { + ResettableValue> downsampling = randomResettable( + DataStreamLifecycleFixtures::randomDownsamplingRounds + ); return DataStreamLifecycle.createDataLifecycleTemplate( frequently(), randomResettable(ESTestCase::randomTimeValue), - randomResettable(DataStreamLifecycleFixtures::randomDownsamplingRounds) + downsampling, + randomResettable(() -> randomSamplingMethod(downsampling.get())) ); } - private static ResettableValue randomResettable(Supplier supplier) { + public static ResettableValue randomResettable(Supplier supplier) { return switch (randomIntBetween(0, 2)) { case 0 -> ResettableValue.undefined(); case 1 -> ResettableValue.reset(); @@ -164,7 +170,7 @@ private static List randomDownsamplingRou List rounds = new ArrayList<>(); var previous = new DataStreamLifecycle.DownsamplingRound( TimeValue.timeValueDays(randomIntBetween(1, 365)), - new DownsampleConfig(new DateHistogramInterval(randomIntBetween(1, 24) + "h")) + new DateHistogramInterval(randomIntBetween(1, 24) + "h") ); rounds.add(previous); for (int i = 0; i < count; i++) { @@ -177,9 +183,19 @@ private static List randomDownsamplingRou private static DataStreamLifecycle.DownsamplingRound nextRound(DataStreamLifecycle.DownsamplingRound previous) { var after = TimeValue.timeValueDays(previous.after().days() + randomIntBetween(1, 10)); - var fixedInterval = new DownsampleConfig( - new DateHistogramInterval((previous.config().getFixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms") - ); + var fixedInterval = new DateHistogramInterval((previous.fixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms"); return new DataStreamLifecycle.DownsamplingRound(after, fixedInterval); } + + /** + * In order to produce valid data stream lifecycle configurations, the sampling method can be defined only when + * the downsampling rounds are also defined. + */ + public static DownsampleConfig.SamplingMethod randomSamplingMethod(List downsamplingRounds) { + if (downsamplingRounds == null || between(0, DownsampleConfig.SamplingMethod.values().length) == 0) { + return null; + } else { + return randomFrom(DownsampleConfig.SamplingMethod.values()); + } + } } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java index 0ce32cf1f94ab..35c709462d5d9 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java @@ -1228,9 +1228,7 @@ public void testDownsampling() throws Exception { .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES) .put("index.routing_path", "@timestamp"), DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( - List.of(new DownsamplingRound(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("5m")))) - ) + .downsamplingRounds(List.of(new DownsamplingRound(TimeValue.timeValueMillis(0), new DateHistogramInterval("5m")))) .dataRetention(TimeValue.MAX_VALUE) .build(), now @@ -1377,9 +1375,7 @@ public void testDownsamplingWhenTargetIndexNameClashYieldsException() throws Exc .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES) .put("index.routing_path", "@timestamp"), DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( - List.of(new DownsamplingRound(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("5m")))) - ) + .downsamplingRounds(List.of(new DownsamplingRound(TimeValue.timeValueMillis(0), new DateHistogramInterval("5m")))) .dataRetention(TimeValue.MAX_VALUE) .build(), now @@ -1662,9 +1658,7 @@ private ClusterState downsampleSetup(ProjectId projectId, String dataStreamName, settings(IndexVersion.current()).put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES) .put("index.routing_path", "@timestamp"), DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( - List.of(new DownsamplingRound(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("5m")))) - ) + .downsamplingRounds(List.of(new DownsamplingRound(TimeValue.timeValueMillis(0), new DateHistogramInterval("5m")))) .dataRetention(TimeValue.timeValueMillis(1)) .build(), now diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/20_basic.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/20_basic.yml index 4bf6ccfbfa7ce..56c8f764a3d2d 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/20_basic.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/20_basic.yml @@ -171,6 +171,58 @@ teardown: - match: { data_streams.1.lifecycle.downsampling.1.after: '100d'} - match: { data_streams.1.lifecycle.downsampling.1.fixed_interval: '10h'} +--- +"Put data stream lifecycle with downsampling method": + - requires: + capabilities: + - method: PUT + path: /_data_stream/{name}/_lifecycle + capabilities: [ "dlm.downsampling_method" ] + test_runner_features: [ "capabilities" ] + reason: "Downsampling method was added to data stream lifecycle was available from 9.3" + + - do: + indices.put_data_lifecycle: + name: "*" + body: > + { + "downsampling": [ + { + "after": "10d", + "fixed_interval": "1h" + }, + { + "after": "100d", + "fixed_interval": "10h" + } + ], + "downsampling_method": "aggregate", + "data_retention": "30d", + "enabled": false + } + + - is_true: acknowledged + + - do: + indices.get_data_lifecycle: + name: "*" + - length: { data_streams: 2 } + - match: { data_streams.0.name: data-stream-with-lifecycle } + - match: { data_streams.0.lifecycle.data_retention: "30d" } + - match: { data_streams.0.lifecycle.enabled: false} + - match: { data_streams.0.lifecycle.downsampling.0.after: '10d'} + - match: { data_streams.0.lifecycle.downsampling.0.fixed_interval: '1h'} + - match: { data_streams.0.lifecycle.downsampling.1.after: '100d'} + - match: { data_streams.0.lifecycle.downsampling.1.fixed_interval: '10h'} + - match: { data_streams.1.name: simple-data-stream1 } + - match: { data_streams.1.lifecycle.data_retention: "30d" } + - match: { data_streams.1.lifecycle.enabled: false} + - match: { data_streams.1.lifecycle.downsampling_method: 'aggregate'} + - match: { data_streams.1.lifecycle.downsampling.0.after: '10d'} + - match: { data_streams.1.lifecycle.downsampling.0.fixed_interval: '1h'} + - match: { data_streams.1.lifecycle.downsampling.1.after: '100d'} + - match: { data_streams.1.lifecycle.downsampling.1.fixed_interval: '10h'} + --- "Enable lifecycle": diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.component_template/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.component_template/10_basic.yml index 800dec2a795a4..c230371cc4003 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.component_template/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.component_template/10_basic.yml @@ -142,6 +142,38 @@ - match: {component_templates.0.component_template.template.lifecycle.enabled: true} - match: {component_templates.0.component_template.template.lifecycle.data_retention: "10d"} +--- +"Add data stream lifecycle with downsampling": + - requires: + capabilities: + - method: PUT + path: /_component_template/{name} + capabilities: [ "dlm.downsampling_method" ] + test_runner_features: [ "capabilities" ] + reason: "Downsampling method was added to data stream lifecycle was available from 9.3" + + - do: + cluster.put_component_template: + name: test-lifecycle + body: + template: + lifecycle: + data_retention: "10d" + downsampling_method: last_value + downsampling: + - {"after": "1d", "fixed_interval": "5m"} + + - do: + cluster.get_component_template: + name: test-lifecycle + + - match: {component_templates.0.name: test-lifecycle} + - match: {component_templates.0.component_template.template.lifecycle.enabled: true} + - match: {component_templates.0.component_template.template.lifecycle.data_retention: "10d"} + - match: {component_templates.0.component_template.template.lifecycle.downsampling_method: "last_value"} + - match: {component_templates.0.component_template.template.lifecycle.downsampling.0.after: "1d"} + - match: {component_templates.0.component_template.template.lifecycle.downsampling.0.fixed_interval: "5m"} + --- "Get data stream lifecycle with default rollover": - requires: diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.put_index_template/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.put_index_template/10_basic.yml index 81068f460b2b3..b538293d668ea 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.put_index_template/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.put_index_template/10_basic.yml @@ -197,3 +197,37 @@ name: baz - match: {index_templates.0.name: baz} + +--- +"Add data stream lifecycle with downsampling": + - requires: + capabilities: + - method: PUT + path: /_index_template/{name} + capabilities: [ "dlm.downsampling_method" ] + test_runner_features: [ "capabilities" ] + reason: "Downsampling method was added to data stream lifecycle was available from 9.3" + + - do: + indices.put_index_template: + name: test-lifecycle + body: + index_patterns: downsampling-* + data_stream: {} + template: + lifecycle: + data_retention: "10d" + downsampling_method: last_value + downsampling: + - {"after": "1d", "fixed_interval": "5m"} + + - do: + indices.get_index_template: + name: test-lifecycle + + - match: {index_templates.0.name: test-lifecycle} + - match: {index_templates.0.index_template.template.lifecycle.enabled: true} + - match: {index_templates.0.index_template.template.lifecycle.data_retention: "10d"} + - match: {index_templates.0.index_template.template.lifecycle.downsampling_method: "last_value"} + - match: {index_templates.0.index_template.template.lifecycle.downsampling.0.after: "1d"} + - match: {index_templates.0.index_template.template.lifecycle.downsampling.0.fixed_interval: "5m"} diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/PutDataStreamLifecycleAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/PutDataStreamLifecycleAction.java index 45b71fd63cb16..b293f195d1864 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/PutDataStreamLifecycleAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/PutDataStreamLifecycleAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.downsample.DownsampleConfig; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -33,6 +34,7 @@ import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DATA_RETENTION_FIELD; import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DOWNSAMPLING_FIELD; +import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DOWNSAMPLING_METHOD_FIELD; import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.ENABLED_FIELD; /** @@ -50,7 +52,8 @@ public interface Factory { Request create( @Nullable TimeValue dataRetention, @Nullable Boolean enabled, - @Nullable List downsampling + @Nullable List downsampling, + @Nullable DownsampleConfig.SamplingMethod downsamplingMethod ); } @@ -58,7 +61,12 @@ Request create( public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "put_data_stream_lifecycle_request", false, - (args, factory) -> factory.create((TimeValue) args[0], (Boolean) args[1], (List) args[2]) + (args, factory) -> factory.create( + (TimeValue) args[0], + (Boolean) args[1], + (List) args[2], + (DownsampleConfig.SamplingMethod) args[3] + ) ); static { @@ -75,6 +83,12 @@ Request create( DOWNSAMPLING_FIELD, ObjectParser.ValueType.OBJECT_ARRAY ); + PARSER.declareField( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> DownsampleConfig.SamplingMethod.fromString(p.text()), + DOWNSAMPLING_METHOD_FIELD, + ObjectParser.ValueType.STRING + ); } public static Request parseRequest(XContentParser parser, Factory factory) { @@ -120,7 +134,7 @@ public void writeTo(StreamOutput out) throws IOException { } public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String[] names, @Nullable TimeValue dataRetention) { - this(masterNodeTimeout, ackTimeout, names, dataRetention, null, null); + this(masterNodeTimeout, ackTimeout, names, dataRetention, null); } public Request(TimeValue masterNodeTimeout, TimeValue ackTimeout, String[] names, DataStreamLifecycle lifecycle) { @@ -136,7 +150,7 @@ public Request( @Nullable TimeValue dataRetention, @Nullable Boolean enabled ) { - this(masterNodeTimeout, ackTimeout, names, dataRetention, enabled, null); + this(masterNodeTimeout, ackTimeout, names, dataRetention, enabled, null, null); } public Request( @@ -145,14 +159,16 @@ public Request( String[] names, @Nullable TimeValue dataRetention, @Nullable Boolean enabled, - @Nullable List downsampling + @Nullable List downsamplingRounds, + @Nullable DownsampleConfig.SamplingMethod downsamplingMethod ) { super(masterNodeTimeout, ackTimeout); this.names = names; this.lifecycle = DataStreamLifecycle.dataLifecycleBuilder() .dataRetention(dataRetention) .enabled(enabled == null || enabled) - .downsampling(downsampling) + .downsamplingRounds(downsamplingRounds) + .downsamplingMethod(downsamplingMethod) .build(); } diff --git a/server/src/main/java/org/elasticsearch/action/downsample/DownsampleConfig.java b/server/src/main/java/org/elasticsearch/action/downsample/DownsampleConfig.java index 444785d55e3d4..ae08245ceb7c9 100644 --- a/server/src/main/java/org/elasticsearch/action/downsample/DownsampleConfig.java +++ b/server/src/main/java/org/elasticsearch/action/downsample/DownsampleConfig.java @@ -101,17 +101,6 @@ public class DownsampleConfig implements NamedWriteable, ToXContentObject { ); } - /** - * Create a new {@link DownsampleConfig} using the given configuration parameters. - * @param fixedInterval the fixed interval to use for computing the date histogram for the rolled up documents (required). - * @deprecated please use {@link DownsampleConfig#DownsampleConfig(DateHistogramInterval, SamplingMethod)}, this method is being kept - * until the sampling method is completely integrated with ILM and DLM. - */ - @Deprecated - public DownsampleConfig(final DateHistogramInterval fixedInterval) { - this(fixedInterval, null); - } - /** * Create a new {@link DownsampleConfig} using the given configuration parameters. * @param fixedInterval the fixed interval to use for computing the date histogram for the rolled up documents (required). @@ -144,25 +133,28 @@ public DownsampleConfig(final StreamInput in) throws IOException { * - The target interval needs to be a multiple of the source interval * throws an IllegalArgumentException to signal that the target interval is not acceptable */ - public static void validateSourceAndTargetIntervals(DownsampleConfig source, DownsampleConfig target) { - long sourceMillis = source.fixedInterval.estimateMillis(); - long targetMillis = target.fixedInterval.estimateMillis(); + public static void validateSourceAndTargetIntervals( + DateHistogramInterval sourceFxedInterval, + DateHistogramInterval targetFixedInterval + ) { + long sourceMillis = sourceFxedInterval.estimateMillis(); + long targetMillis = targetFixedInterval.estimateMillis(); if (sourceMillis >= targetMillis) { // Downsampling interval must be greater than source interval throw new IllegalArgumentException( "Downsampling interval [" - + target.fixedInterval + + targetFixedInterval + "] must be greater than the source index interval [" - + source.fixedInterval + + sourceFxedInterval + "]." ); } else if (targetMillis % sourceMillis != 0) { // Downsampling interval must be a multiple of the source interval throw new IllegalArgumentException( "Downsampling interval [" - + target.fixedInterval + + targetFixedInterval + "] must be a multiple of the source index interval [" - + source.fixedInterval + + sourceFxedInterval + "]." ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 9e7fcb1e60f16..93be5259e29f6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -1247,7 +1247,7 @@ public List getDownsamplingRoundsFor( LongSupplier nowSupplier ) { assert backingIndices.indices.contains(index) : "the provided index must be a backing index for this datastream"; - if (lifecycle == null || lifecycle.downsampling() == null) { + if (lifecycle == null || lifecycle.downsamplingRounds() == null) { return List.of(); } @@ -1260,8 +1260,8 @@ public List getDownsamplingRoundsFor( if (indexGenerationTime != null) { long nowMillis = nowSupplier.getAsLong(); long indexGenerationTimeMillis = indexGenerationTime.millis(); - List orderedRoundsForIndex = new ArrayList<>(lifecycle.downsampling().size()); - for (DownsamplingRound round : lifecycle.downsampling()) { + List orderedRoundsForIndex = new ArrayList<>(lifecycle.downsamplingRounds().size()); + for (DownsamplingRound round : lifecycle.downsamplingRounds()) { if (nowMillis >= indexGenerationTimeMillis + round.after().getMillis()) { orderedRoundsForIndex.add(round); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java index 7e60ddf0818d1..bb87ab1994ace 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java @@ -52,12 +52,13 @@ * Lifecycle supports the following configurations: * - enabled, applicable to data and failures * - data retention, applicable to data and failures - * - downsampling, applicable only to data + * - downsampling and downsampling method, applicable only to data */ public class DataStreamLifecycle implements SimpleDiffable, ToXContentObject { // Versions over the wire public static final TransportVersion ADDED_ENABLED_FLAG_VERSION = TransportVersions.V_8_10_X; + public static final TransportVersion ADD_SAMPLE_METHOD_DOWNSAMPLE_DLM = TransportVersion.fromName("add_sample_method_downsample_dlm"); public static final String EFFECTIVE_RETENTION_REST_API_CAPABILITY = "data_stream_lifecycle_effective_retention"; public static final String DATA_STREAMS_LIFECYCLE_ONLY_SETTING_NAME = "data_streams.lifecycle_only.mode"; @@ -74,6 +75,8 @@ public class DataStreamLifecycle implements SimpleDiffable, "Failure store lifecycle does not support downsampling, please remove the downsampling configuration."; private static final TransportVersion INTRODUCE_LIFECYCLE_TEMPLATE = TransportVersion.fromName("introduce_lifecycle_template"); + public static final String DOWNSAMPLING_METHOD_WITHOUT_ROUNDS_ERROR = + "Downsampling method can only be set when there is at least one downsampling round."; /** * Check if {@link #DATA_STREAMS_LIFECYCLE_ONLY_SETTING_NAME} is present and set to {@code true}, indicating that @@ -94,7 +97,7 @@ public static boolean isDataStreamsLifecycleOnlyMode(final Settings settings) { Setting.Property.NodeScope ); - public static final DataStreamLifecycle DEFAULT_DATA_LIFECYCLE = DataStreamLifecycle.createDataLifecycle(null, null, null); + public static final DataStreamLifecycle DEFAULT_DATA_LIFECYCLE = DataStreamLifecycle.createDataLifecycle(null, null, null, null); public static final DataStreamLifecycle DEFAULT_FAILURE_LIFECYCLE = DataStreamLifecycle.createFailuresLifecycle(null, null); public static final String DATA_STREAM_LIFECYCLE_ORIGIN = "data_stream_lifecycle"; @@ -104,13 +107,20 @@ public static boolean isDataStreamsLifecycleOnlyMode(final Settings settings) { public static final ParseField EFFECTIVE_RETENTION_FIELD = new ParseField("effective_retention"); public static final ParseField RETENTION_SOURCE_FIELD = new ParseField("retention_determined_by"); public static final ParseField DOWNSAMPLING_FIELD = new ParseField("downsampling"); + public static final ParseField DOWNSAMPLING_METHOD_FIELD = new ParseField("downsampling_method"); private static final ParseField ROLLOVER_FIELD = new ParseField("rollover"); @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "lifecycle", false, - (args, lt) -> new DataStreamLifecycle(lt, (Boolean) args[0], (TimeValue) args[1], (List) args[2]) + (args, lt) -> new DataStreamLifecycle( + lt, + (Boolean) args[0], + (TimeValue) args[1], + (List) args[2], + (DownsampleConfig.SamplingMethod) args[3] + ) ); static { @@ -133,6 +143,12 @@ public static boolean isDataStreamsLifecycleOnlyMode(final Settings settings) { return AbstractObjectParser.parseArray(p, null, DownsamplingRound::fromXContent); } }, DOWNSAMPLING_FIELD, ObjectParser.ValueType.OBJECT_ARRAY_OR_NULL); + PARSER.declareField( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> DownsampleConfig.SamplingMethod.fromString(p.text()), + DOWNSAMPLING_METHOD_FIELD, + ObjectParser.ValueType.STRING + ); } private static final TransportVersion INTRODUCE_FAILURES_LIFECYCLE = TransportVersion.fromName("introduce_failures_lifecycle"); @@ -142,26 +158,34 @@ public static boolean isDataStreamsLifecycleOnlyMode(final Settings settings) { @Nullable private final TimeValue dataRetention; @Nullable - private final List downsampling; + private final List downsamplingRounds; + @Nullable + private final DownsampleConfig.SamplingMethod downsamplingMethod; /** - * This constructor is visible for testing, please use {@link DataStreamLifecycle#createDataLifecycle(Boolean, TimeValue, List)} or + * This constructor is visible for testing, please use + * {@link DataStreamLifecycle#createDataLifecycle(Boolean, TimeValue, List, DownsampleConfig.SamplingMethod)} or * {@link DataStreamLifecycle#createFailuresLifecycle(Boolean, TimeValue)}. */ DataStreamLifecycle( LifecycleType lifecycleType, @Nullable Boolean enabled, @Nullable TimeValue dataRetention, - @Nullable List downsampling + @Nullable List downsamplingRounds, + @Nullable DownsampleConfig.SamplingMethod downsamplingMethod ) { this.lifecycleType = lifecycleType; this.enabled = enabled == null || enabled; this.dataRetention = dataRetention; - if (lifecycleType == LifecycleType.FAILURES && downsampling != null) { + if (lifecycleType == LifecycleType.FAILURES && downsamplingRounds != null) { throw new IllegalArgumentException(DOWNSAMPLING_NOT_SUPPORTED_ERROR_MESSAGE); } - DownsamplingRound.validateRounds(downsampling); - this.downsampling = downsampling; + DownsamplingRound.validateRounds(downsamplingRounds); + this.downsamplingRounds = downsamplingRounds; + if (downsamplingMethod != null && downsamplingRounds == null) { + throw new IllegalArgumentException(DOWNSAMPLING_METHOD_WITHOUT_ROUNDS_ERROR); + } + this.downsamplingMethod = downsamplingMethod; } /** @@ -171,9 +195,10 @@ public static boolean isDataStreamsLifecycleOnlyMode(final Settings settings) { public static DataStreamLifecycle createDataLifecycle( @Nullable Boolean enabled, @Nullable TimeValue dataRetention, - @Nullable List downsampling + @Nullable List downsamplingRounds, + @Nullable DownsampleConfig.SamplingMethod downsamplingMethod ) { - return new DataStreamLifecycle(LifecycleType.DATA, enabled, dataRetention, downsampling); + return new DataStreamLifecycle(LifecycleType.DATA, enabled, dataRetention, downsamplingRounds, downsamplingMethod); } /** @@ -181,7 +206,7 @@ public static DataStreamLifecycle createDataLifecycle( * means it supports only enabling and retention. */ public static DataStreamLifecycle createFailuresLifecycle(@Nullable Boolean enabled, @Nullable TimeValue dataRetention) { - return new DataStreamLifecycle(LifecycleType.FAILURES, enabled, dataRetention, null); + return new DataStreamLifecycle(LifecycleType.FAILURES, enabled, dataRetention, null, null); } /** @@ -304,11 +329,19 @@ public void addWarningHeaderIfDataRetentionNotEffective( /** * The configured downsampling rounds with the `after` and the `fixed_interval` per round. If downsampling is - * not configured then it returns null. + * not configured, then it returns null. + */ + @Nullable + public List downsamplingRounds() { + return downsamplingRounds; + } + + /** + * The configured downsampling method. If downsampling is not configured, then it returns null. */ @Nullable - public List downsampling() { - return downsampling; + public DownsampleConfig.SamplingMethod downsamplingMethod() { + return downsamplingMethod; } @Override @@ -319,13 +352,14 @@ public boolean equals(Object o) { final DataStreamLifecycle that = (DataStreamLifecycle) o; return lifecycleType == that.lifecycleType && Objects.equals(dataRetention, that.dataRetention) - && Objects.equals(downsampling, that.downsampling) + && Objects.equals(downsamplingRounds, that.downsamplingRounds) + && Objects.equals(downsamplingMethod, that.downsamplingMethod) && enabled == that.enabled; } @Override public int hashCode() { - return Objects.hash(lifecycleType, enabled, dataRetention, downsampling); + return Objects.hash(lifecycleType, enabled, dataRetention, downsamplingRounds, downsamplingMethod); } @Override @@ -340,15 +374,18 @@ public void writeTo(StreamOutput out) throws IOException { } if (out.getTransportVersion().onOrAfter(ADDED_ENABLED_FLAG_VERSION)) { if (out.getTransportVersion().supports(INTRODUCE_LIFECYCLE_TEMPLATE)) { - out.writeOptionalCollection(downsampling); + out.writeOptionalCollection(downsamplingRounds); } else { - writeLegacyOptionalValue(downsampling, out, StreamOutput::writeCollection); + writeLegacyOptionalValue(downsamplingRounds, out, StreamOutput::writeCollection); } out.writeBoolean(enabled()); } if (out.getTransportVersion().supports(INTRODUCE_FAILURES_LIFECYCLE)) { lifecycleType.writeTo(out); } + if (out.getTransportVersion().supports(ADD_SAMPLE_METHOD_DOWNSAMPLE_DLM)) { + out.writeOptionalWriteable(downsamplingMethod); + } } public DataStreamLifecycle(StreamInput in) throws IOException { @@ -363,16 +400,19 @@ public DataStreamLifecycle(StreamInput in) throws IOException { } if (in.getTransportVersion().onOrAfter(ADDED_ENABLED_FLAG_VERSION)) { if (in.getTransportVersion().supports(INTRODUCE_LIFECYCLE_TEMPLATE)) { - downsampling = in.readOptionalCollectionAsList(DownsamplingRound::read); + downsamplingRounds = in.readOptionalCollectionAsList(DownsamplingRound::read); } else { - downsampling = readLegacyOptionalValue(in, is -> is.readCollectionAsList(DownsamplingRound::read)); + downsamplingRounds = readLegacyOptionalValue(in, is -> is.readCollectionAsList(DownsamplingRound::read)); } enabled = in.readBoolean(); } else { - downsampling = null; + downsamplingRounds = null; enabled = true; } lifecycleType = in.getTransportVersion().supports(INTRODUCE_FAILURES_LIFECYCLE) ? LifecycleType.read(in) : LifecycleType.DATA; + downsamplingMethod = in.getTransportVersion().supports(ADD_SAMPLE_METHOD_DOWNSAMPLE_DLM) + ? in.readOptionalWriteable(DownsampleConfig.SamplingMethod::read) + : null; } /** @@ -425,8 +465,10 @@ public String toString() { + enabled + ", dataRetention=" + dataRetention - + ", downsampling=" - + downsampling + + ", downsamplingRounds=" + + downsamplingRounds + + ", downsamplingMethod=" + + downsamplingMethod + '}'; } @@ -465,8 +507,11 @@ public XContentBuilder toXContent( } } - if (downsampling != null) { - builder.field(DOWNSAMPLING_FIELD.getPreferredName(), downsampling); + if (downsamplingRounds != null) { + builder.field(DOWNSAMPLING_FIELD.getPreferredName(), downsamplingRounds); + } + if (downsamplingMethod != null) { + builder.field(DOWNSAMPLING_METHOD_FIELD.getPreferredName(), downsamplingMethod.toString()); } if (rolloverConfiguration != null) { builder.field(ROLLOVER_FIELD.getPreferredName()); @@ -522,9 +567,9 @@ public String displayName() { /** * A round represents the configuration for when and how elasticsearch will downsample a backing index. * @param after is a TimeValue configuring how old (based on generation age) should a backing index be before downsampling - * @param config contains the interval that the backing index is going to be downsampled. + * @param fixedInterval contains the interval that the backing index is going to be downsampled. */ - public record DownsamplingRound(TimeValue after, DownsampleConfig config) implements Writeable, ToXContentObject { + public record DownsamplingRound(TimeValue after, DateHistogramInterval fixedInterval) implements Writeable, ToXContentObject { public static final ParseField AFTER_FIELD = new ParseField("after"); public static final ParseField FIXED_INTERVAL_FIELD = new ParseField("fixed_interval"); @@ -533,7 +578,7 @@ public record DownsamplingRound(TimeValue after, DownsampleConfig config) implem private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "downsampling_round", false, - (args, unused) -> new DownsamplingRound((TimeValue) args[0], new DownsampleConfig((DateHistogramInterval) args[1])) + (args, unused) -> new DownsamplingRound((TimeValue) args[0], (DateHistogramInterval) args[1]) ); static { @@ -545,7 +590,7 @@ public record DownsamplingRound(TimeValue after, DownsampleConfig config) implem PARSER.declareField( constructorArg(), p -> new DateHistogramInterval(p.text()), - new ParseField(FIXED_INTERVAL_FIELD.getPreferredName()), + FIXED_INTERVAL_FIELD, ObjectParser.ValueType.STRING ); } @@ -576,19 +621,23 @@ static void validateRounds(List rounds) { + "." ); } - DownsampleConfig.validateSourceAndTargetIntervals(previous.config(), round.config()); + DownsampleConfig.validateSourceAndTargetIntervals(previous.fixedInterval(), round.fixedInterval()); } } } public static DownsamplingRound read(StreamInput in) throws IOException { - return new DownsamplingRound(in.readTimeValue(), new DownsampleConfig(in)); + TimeValue after = in.readTimeValue(); + DateHistogramInterval fixedInterval = in.getTransportVersion().supports(ADD_SAMPLE_METHOD_DOWNSAMPLE_DLM) + ? new DateHistogramInterval(in) + : new DownsampleConfig(in).getFixedInterval(); + return new DownsamplingRound(after, fixedInterval); } public DownsamplingRound { - if (config.getFixedInterval().estimateMillis() < FIVE_MINUTES_MILLIS) { + if (fixedInterval.estimateMillis() < FIVE_MINUTES_MILLIS) { throw new IllegalArgumentException( - "A downsampling round must have a fixed interval of at least five minutes but found: " + config.getFixedInterval() + "A downsampling round must have a fixed interval of at least five minutes but found: " + fixedInterval ); } } @@ -596,14 +645,18 @@ public static DownsamplingRound read(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeTimeValue(after); - out.writeWriteable(config); + if (out.getTransportVersion().supports(ADD_SAMPLE_METHOD_DOWNSAMPLE_DLM)) { + out.writeWriteable(fixedInterval); + } else { + out.writeWriteable(new DownsampleConfig(fixedInterval, null)); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(AFTER_FIELD.getPreferredName(), after.getStringRep()); - config.toXContentFragment(builder); + builder.field(FIXED_INTERVAL_FIELD.getPreferredName(), fixedInterval().toString()); builder.endObject(); return builder; } @@ -625,9 +678,16 @@ public String toString() { public static Template createDataLifecycleTemplate( boolean enabled, TimeValue dataRetention, - List downsampling + List downsamplingRounds, + DownsampleConfig.SamplingMethod downsamplingMethod ) { - return new Template(LifecycleType.DATA, enabled, ResettableValue.create(dataRetention), ResettableValue.create(downsampling)); + return new Template( + LifecycleType.DATA, + enabled, + ResettableValue.create(dataRetention), + ResettableValue.create(downsamplingRounds), + ResettableValue.create(downsamplingMethod) + ); } /** @@ -637,9 +697,10 @@ public static Template createDataLifecycleTemplate( public static Template createDataLifecycleTemplate( boolean enabled, ResettableValue dataRetention, - ResettableValue> downsampling + ResettableValue> downsamplingRounds, + ResettableValue downsamplingMethod ) { - return new Template(LifecycleType.DATA, enabled, dataRetention, downsampling); + return new Template(LifecycleType.DATA, enabled, dataRetention, downsamplingRounds, downsamplingMethod); } /** @@ -647,7 +708,13 @@ public static Template createDataLifecycleTemplate( * means it supports only setting the enabled and the retention. */ public static Template createFailuresLifecycleTemplate(boolean enabled, TimeValue dataRetention) { - return new Template(LifecycleType.FAILURES, enabled, ResettableValue.create(dataRetention), ResettableValue.undefined()); + return new Template( + LifecycleType.FAILURES, + enabled, + ResettableValue.create(dataRetention), + ResettableValue.undefined(), + ResettableValue.undefined() + ); } /** @@ -658,24 +725,34 @@ public record Template( LifecycleType lifecycleType, boolean enabled, ResettableValue dataRetention, - ResettableValue> downsampling + ResettableValue> downsamplingRounds, + ResettableValue downsamplingMethod ) implements ToXContentObject, Writeable { Template( LifecycleType lifecycleType, boolean enabled, TimeValue dataRetention, - List downsampling + List downsamplingRounds, + DownsampleConfig.SamplingMethod downsamplingMethod ) { - this(lifecycleType, enabled, ResettableValue.create(dataRetention), ResettableValue.create(downsampling)); + this( + lifecycleType, + enabled, + ResettableValue.create(dataRetention), + ResettableValue.create(downsamplingRounds), + ResettableValue.create(downsamplingMethod) + ); } public Template { - if (lifecycleType == LifecycleType.FAILURES && downsampling.get() != null) { + if (lifecycleType == LifecycleType.FAILURES && downsamplingRounds.get() != null) { throw new IllegalArgumentException(DOWNSAMPLING_NOT_SUPPORTED_ERROR_MESSAGE); } - if (downsampling.isDefined() && downsampling.get() != null) { - DownsamplingRound.validateRounds(downsampling.get()); + if (downsamplingRounds.isDefined() && downsamplingRounds.get() != null) { + DownsamplingRound.validateRounds(downsamplingRounds.get()); + } else if (downsamplingMethod.isDefined() && downsamplingMethod.get() != null) { + throw new IllegalArgumentException(DOWNSAMPLING_METHOD_WITHOUT_ROUNDS_ERROR); } } @@ -683,6 +760,7 @@ public record Template( LifecycleType.DATA, true, ResettableValue.undefined(), + ResettableValue.undefined(), ResettableValue.undefined() ); @@ -694,7 +772,8 @@ public record Template( lt, args[0] == null || (boolean) args[0], args[1] == null ? ResettableValue.undefined() : (ResettableValue) args[1], - args[2] == null ? ResettableValue.undefined() : (ResettableValue>) args[2] + args[2] == null ? ResettableValue.undefined() : (ResettableValue>) args[2], + args[3] == null ? ResettableValue.undefined() : (ResettableValue) args[3] ) ); @@ -713,6 +792,10 @@ public record Template( return ResettableValue.create(AbstractObjectParser.parseArray(p, null, DownsamplingRound::fromXContent)); } }, DOWNSAMPLING_FIELD, ObjectParser.ValueType.OBJECT_ARRAY_OR_NULL); + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> { + String value = p.textOrNull(); + return value == null ? ResettableValue.reset() : ResettableValue.create(DownsampleConfig.SamplingMethod.fromString(value)); + }, DOWNSAMPLING_METHOD_FIELD, ObjectParser.ValueType.STRING_OR_NULL); } @Override @@ -727,15 +810,18 @@ public void writeTo(StreamOutput out) throws IOException { } if (out.getTransportVersion().onOrAfter(ADDED_ENABLED_FLAG_VERSION)) { if (out.getTransportVersion().supports(INTRODUCE_LIFECYCLE_TEMPLATE)) { - ResettableValue.write(out, downsampling, StreamOutput::writeCollection); + ResettableValue.write(out, downsamplingRounds, StreamOutput::writeCollection); } else { - writeLegacyValue(out, downsampling, StreamOutput::writeCollection); + writeLegacyValue(out, downsamplingRounds, StreamOutput::writeCollection); } out.writeBoolean(enabled); } if (out.getTransportVersion().supports(INTRODUCE_FAILURES_LIFECYCLE)) { lifecycleType.writeTo(out); } + if (out.getTransportVersion().supports(ADD_SAMPLE_METHOD_DOWNSAMPLE_DLM)) { + ResettableValue.write(out, downsamplingMethod, StreamOutput::writeWriteable); + } } /** @@ -775,7 +861,7 @@ static ResettableValue readLegacyValues(StreamInput in, Writeable.Reader< public static Template read(StreamInput in) throws IOException { boolean enabled = true; ResettableValue dataRetention = ResettableValue.undefined(); - ResettableValue> downsampling = ResettableValue.undefined(); + ResettableValue> downsamplingRounds = ResettableValue.undefined(); // The order of the fields is like this for bwc reasons if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) { @@ -787,16 +873,20 @@ public static Template read(StreamInput in) throws IOException { } if (in.getTransportVersion().onOrAfter(ADDED_ENABLED_FLAG_VERSION)) { if (in.getTransportVersion().supports(INTRODUCE_LIFECYCLE_TEMPLATE)) { - downsampling = ResettableValue.read(in, i -> i.readCollectionAsList(DownsamplingRound::read)); + downsamplingRounds = ResettableValue.read(in, i -> i.readCollectionAsList(DownsamplingRound::read)); } else { - downsampling = readLegacyValues(in, i -> i.readCollectionAsList(DownsamplingRound::read)); + downsamplingRounds = readLegacyValues(in, i -> i.readCollectionAsList(DownsamplingRound::read)); } enabled = in.readBoolean(); } var lifecycleTarget = in.getTransportVersion().supports(INTRODUCE_FAILURES_LIFECYCLE) ? LifecycleType.read(in) : LifecycleType.DATA; - return new Template(lifecycleTarget, enabled, dataRetention, downsampling); + ResettableValue downsamplingMethod = in.getTransportVersion() + .supports(ADD_SAMPLE_METHOD_DOWNSAMPLE_DLM) + ? ResettableValue.read(in, DownsampleConfig.SamplingMethod::read) + : ResettableValue.undefined(); + return new Template(lifecycleTarget, enabled, dataRetention, downsamplingRounds, downsamplingMethod); } public static Template dataLifecycleTemplatefromXContent(XContentParser parser) throws IOException { @@ -830,7 +920,13 @@ public XContentBuilder toXContent( builder.startObject(); builder.field(ENABLED_FIELD.getPreferredName(), enabled); dataRetention.toXContent(builder, params, DATA_RETENTION_FIELD.getPreferredName(), TimeValue::getStringRep); - downsampling.toXContent(builder, params, DOWNSAMPLING_FIELD.getPreferredName()); + downsamplingRounds.toXContent(builder, params, DOWNSAMPLING_FIELD.getPreferredName()); + downsamplingMethod.toXContent( + builder, + params, + DOWNSAMPLING_METHOD_FIELD.getPreferredName(), + DownsampleConfig.SamplingMethod::toString + ); if (rolloverConfiguration != null) { builder.field(ROLLOVER_FIELD.getPreferredName()); rolloverConfiguration.evaluateAndConvertToXContent( @@ -844,7 +940,7 @@ public XContentBuilder toXContent( } public DataStreamLifecycle toDataStreamLifecycle() { - return new DataStreamLifecycle(lifecycleType, enabled, dataRetention.get(), downsampling.get()); + return new DataStreamLifecycle(lifecycleType, enabled, dataRetention.get(), downsamplingRounds.get(), downsamplingMethod.get()); } } @@ -881,7 +977,9 @@ public static class Builder { @Nullable private TimeValue dataRetention = null; @Nullable - private List downsampling = null; + private List downsamplingRounds = null; + @Nullable + private DownsampleConfig.SamplingMethod downsamplingMethod = null; private Builder(LifecycleType lifecycleType) { this.lifecycleType = lifecycleType; @@ -891,21 +989,24 @@ private Builder(DataStreamLifecycle.Template template) { lifecycleType = template.lifecycleType(); enabled = template.enabled(); dataRetention = template.dataRetention().get(); - downsampling = template.downsampling().get(); + downsamplingRounds = template.downsamplingRounds().get(); + downsamplingMethod = template.downsamplingMethod().get(); } private Builder(DataStreamLifecycle lifecycle) { lifecycleType = lifecycle.lifecycleType; enabled = lifecycle.enabled(); dataRetention = lifecycle.dataRetention(); - downsampling = lifecycle.downsampling(); + downsamplingRounds = lifecycle.downsamplingRounds(); + downsamplingMethod = lifecycle.downsamplingMethod(); } public Builder composeTemplate(DataStreamLifecycle.Template template) { assert lifecycleType == template.lifecycleType() : "Trying to compose templates with different lifecycle types"; enabled(template.enabled()); dataRetention(template.dataRetention()); - downsampling(template.downsampling()); + downsamplingRounds(template.downsamplingRounds()); + downsamplingMethod(template.downsamplingMethod()); return this; } @@ -926,24 +1027,36 @@ public Builder dataRetention(@Nullable TimeValue dataRetention) { return this; } - public Builder downsampling(ResettableValue> downsampling) { + public Builder downsamplingRounds(ResettableValue> downsampling) { if (downsampling.isDefined()) { - this.downsampling = downsampling.get(); + this.downsamplingRounds = downsampling.get(); + } + return this; + } + + public Builder downsamplingRounds(@Nullable List downsampling) { + this.downsamplingRounds = downsampling; + return this; + } + + public Builder downsamplingMethod(ResettableValue downsamplingMethod) { + if (downsamplingMethod.isDefined()) { + this.downsamplingMethod = downsamplingMethod.get(); } return this; } - public Builder downsampling(@Nullable List downsampling) { - this.downsampling = downsampling; + public Builder downsamplingMethod(@Nullable DownsampleConfig.SamplingMethod downsamplingMethod) { + this.downsamplingMethod = downsamplingMethod; return this; } public DataStreamLifecycle build() { - return new DataStreamLifecycle(lifecycleType, enabled, dataRetention, downsampling); + return new DataStreamLifecycle(lifecycleType, enabled, dataRetention, downsamplingRounds, downsamplingMethod); } public Template buildTemplate() { - return new Template(lifecycleType, enabled, dataRetention, downsampling); + return new Template(lifecycleType, enabled, dataRetention, downsamplingRounds, downsamplingMethod); } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestPutComponentTemplateAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestPutComponentTemplateAction.java index aff4287ecbc58..d790b5ddb70db 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestPutComponentTemplateAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestPutComponentTemplateAction.java @@ -32,10 +32,12 @@ public class RestPutComponentTemplateAction extends BaseRestHandler { public static final String SUPPORTS_FAILURE_STORE_LIFECYCLE = "data_stream_options.failure_store.lifecycle"; public static final String SUPPORTS_FAILURE_STORE = "data_stream_options.failure_store"; private static final String COMPONENT_TEMPLATE_TRACKING_INFO = "component_template_tracking_info"; + static final String SUPPORTS_DOWNSAMPLING_METHOD = "dlm.downsampling_method"; private static final Set CAPABILITIES = Set.of( SUPPORTS_FAILURE_STORE, SUPPORTS_FAILURE_STORE_LIFECYCLE, - COMPONENT_TEMPLATE_TRACKING_INFO + COMPONENT_TEMPLATE_TRACKING_INFO, + SUPPORTS_DOWNSAMPLING_METHOD ); @Override diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestPutComposableIndexTemplateAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestPutComposableIndexTemplateAction.java index 28f5430775004..15f713ea583d7 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestPutComposableIndexTemplateAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestPutComposableIndexTemplateAction.java @@ -25,6 +25,7 @@ import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestRequest.Method.PUT; import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout; +import static org.elasticsearch.rest.action.admin.indices.RestPutComponentTemplateAction.SUPPORTS_DOWNSAMPLING_METHOD; import static org.elasticsearch.rest.action.admin.indices.RestPutComponentTemplateAction.SUPPORTS_FAILURE_STORE; import static org.elasticsearch.rest.action.admin.indices.RestPutComponentTemplateAction.SUPPORTS_FAILURE_STORE_LIFECYCLE; @@ -32,10 +33,12 @@ public class RestPutComposableIndexTemplateAction extends BaseRestHandler { private static final String INDEX_TEMPLATE_TRACKING_INFO = "index_template_tracking_info"; + private static final Set CAPABILITIES = Set.of( SUPPORTS_FAILURE_STORE, SUPPORTS_FAILURE_STORE_LIFECYCLE, - INDEX_TEMPLATE_TRACKING_INFO + INDEX_TEMPLATE_TRACKING_INFO, + SUPPORTS_DOWNSAMPLING_METHOD ); @Override diff --git a/server/src/main/resources/transport/definitions/referable/add_sample_method_downsample_dlm.csv b/server/src/main/resources/transport/definitions/referable/add_sample_method_downsample_dlm.csv new file mode 100644 index 0000000000000..9b32a5cf8b643 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/add_sample_method_downsample_dlm.csv @@ -0,0 +1 @@ +9211000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index 88b7f13fd8f9f..a70b776735734 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -add_sample_method_downsample_ilm,9210000 +add_sample_method_downsample_dlm,9211000 diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java index c90669650a949..c9027799a57c6 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java @@ -107,7 +107,7 @@ private static GetDataStreamAction.Response.DataStreamInfo newDataStreamInfo(boo private static DataStream newDataStreamInstance(boolean isSystem, TimeValue retention) { List indices = List.of(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10))); - DataStreamLifecycle lifecycle = DataStreamLifecycle.createDataLifecycle(true, retention, null); + DataStreamLifecycle lifecycle = DataStreamLifecycle.createDataLifecycle(true, retention, null, null); Settings settings = randomSettings(); CompressedXContent mappings = randomMappings(); return DataStream.builder(randomAlphaOfLength(50), indices) diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ExplainIndexDataStreamLifecycleTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ExplainIndexDataStreamLifecycleTests.java index 43a39f762ce69..2fb4a1c0ced4f 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ExplainIndexDataStreamLifecycleTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ExplainIndexDataStreamLifecycleTests.java @@ -200,7 +200,7 @@ public void testToXContent() throws Exception { TimeValue configuredRetention = TimeValue.timeValueDays(100); TimeValue globalDefaultRetention = TimeValue.timeValueDays(10); TimeValue globalMaxRetention = TimeValue.timeValueDays(50); - DataStreamLifecycle dataStreamLifecycle = DataStreamLifecycle.createDataLifecycle(true, configuredRetention, null); + DataStreamLifecycle dataStreamLifecycle = DataStreamLifecycle.createDataLifecycle(true, configuredRetention, null, null); { boolean isSystemDataStream = true; ExplainIndexDataStreamLifecycle explainIndexDataStreamLifecycle = createManagedIndexDataStreamLifecycleExplanation( diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/GetDataStreamLifecycleActionTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/GetDataStreamLifecycleActionTests.java index 32269d0bc5113..7c72764901505 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/GetDataStreamLifecycleActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/GetDataStreamLifecycleActionTests.java @@ -96,7 +96,7 @@ public void testDataStreamLifecycleToXContent() throws Exception { TimeValue globalDefaultRetention = TimeValue.timeValueDays(10); TimeValue globalMaxRetention = TimeValue.timeValueDays(50); DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(globalDefaultRetention, globalMaxRetention); - DataStreamLifecycle lifecycle = DataStreamLifecycle.createDataLifecycle(true, configuredRetention, null); + DataStreamLifecycle lifecycle = DataStreamLifecycle.createDataLifecycle(true, configuredRetention, null, null); { boolean isInternalDataStream = true; GetDataStreamLifecycleAction.Response.DataStreamLifecycle explainIndexDataStreamLifecycle = createDataStreamLifecycle( diff --git a/server/src/test/java/org/elasticsearch/action/downsample/DownsampleConfigTests.java b/server/src/test/java/org/elasticsearch/action/downsample/DownsampleConfigTests.java index 95611f319d6ed..3469564b28729 100644 --- a/server/src/test/java/org/elasticsearch/action/downsample/DownsampleConfigTests.java +++ b/server/src/test/java/org/elasticsearch/action/downsample/DownsampleConfigTests.java @@ -49,7 +49,7 @@ public static DownsampleConfig randomConfig() { return new DownsampleConfig(randomInterval(), randomSamplingMethod()); } - private static DownsampleConfig.SamplingMethod randomSamplingMethod() { + public static DownsampleConfig.SamplingMethod randomSamplingMethod() { if (between(0, DownsampleConfig.SamplingMethod.values().length) == 0) { return null; } else { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreTemplateTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreTemplateTests.java index 3b7834b67d7a1..950eece0793b7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreTemplateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreTemplateTests.java @@ -146,7 +146,8 @@ private static DataStreamFailureStore.Template normalise(DataStreamFailureStore. template.lifecycleType(), template.enabled(), template.dataRetention().get(), - template.downsampling().get() + template.downsamplingRounds().get(), + template.downsamplingMethod().get() ) ) ); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTemplateTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTemplateTests.java index 1d1c352788346..e17d6891b71d6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTemplateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTemplateTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.action.downsample.DownsampleConfig; +import org.elasticsearch.action.downsample.DownsampleConfigTests; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.core.TimeValue; @@ -44,27 +45,38 @@ protected DataStreamLifecycle.Template mutateInstance(DataStreamLifecycle.Templa var lifecycleTarget = instance.lifecycleType(); var enabled = instance.enabled(); var retention = instance.dataRetention(); - var downsampling = instance.downsampling(); - switch (randomInt(3)) { + var downsamplingRounds = instance.downsamplingRounds(); + var downsamplingMethod = instance.downsamplingMethod(); + switch (randomInt(4)) { case 0 -> { lifecycleTarget = lifecycleTarget == DataStreamLifecycle.LifecycleType.DATA ? DataStreamLifecycle.LifecycleType.FAILURES : DataStreamLifecycle.LifecycleType.DATA; if (lifecycleTarget == DataStreamLifecycle.LifecycleType.FAILURES) { - downsampling = ResettableValue.undefined(); + downsamplingRounds = ResettableValue.undefined(); + downsamplingMethod = ResettableValue.undefined(); } } case 1 -> enabled = enabled == false; case 2 -> retention = randomValueOtherThan(retention, DataStreamLifecycleTemplateTests::randomRetention); case 3 -> { - downsampling = randomValueOtherThan(downsampling, DataStreamLifecycleTemplateTests::randomDownsampling); - if (downsampling.get() != null) { + downsamplingRounds = randomValueOtherThan(downsamplingRounds, DataStreamLifecycleTemplateTests::randomDownsamplingRounds); + if (downsamplingRounds.get() != null) { + lifecycleTarget = DataStreamLifecycle.LifecycleType.DATA; + } else { + downsamplingMethod = ResettableValue.undefined(); + } + } + case 4 -> { + downsamplingMethod = randomValueOtherThan(downsamplingMethod, DataStreamLifecycleTemplateTests::randomDownsamplingMethod); + if (downsamplingMethod.get() != null && downsamplingRounds.get() == null) { + downsamplingRounds = ResettableValue.create(DataStreamLifecycleTests.randomDownsamplingRounds()); lifecycleTarget = DataStreamLifecycle.LifecycleType.DATA; } } default -> throw new AssertionError("Illegal randomisation branch"); } - return new DataStreamLifecycle.Template(lifecycleTarget, enabled, retention, downsampling); + return new DataStreamLifecycle.Template(lifecycleTarget, enabled, retention, downsamplingRounds, downsamplingMethod); } public void testDataLifecycleXContentSerialization() throws IOException { @@ -104,16 +116,10 @@ public void testInvalidDownsamplingConfiguration() { IllegalArgumentException exception = expectThrows( IllegalArgumentException.class, () -> DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( + .downsamplingRounds( List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueDays(10), - new DownsampleConfig(new DateHistogramInterval("2h")) - ), - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueDays(3), - new DownsampleConfig(new DateHistogramInterval("2h")) - ) + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(10), new DateHistogramInterval("2h")), + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(3), new DateHistogramInterval("2h")) ) ) .buildTemplate() @@ -127,16 +133,10 @@ public void testInvalidDownsamplingConfiguration() { IllegalArgumentException exception = expectThrows( IllegalArgumentException.class, () -> DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( + .downsamplingRounds( List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueDays(10), - new DownsampleConfig(new DateHistogramInterval("2h")) - ), - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueDays(30), - new DownsampleConfig(new DateHistogramInterval("2h")) - ) + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(10), new DateHistogramInterval("2h")), + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(30), new DateHistogramInterval("2h")) ) ) .buildTemplate() @@ -147,16 +147,10 @@ public void testInvalidDownsamplingConfiguration() { IllegalArgumentException exception = expectThrows( IllegalArgumentException.class, () -> DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( + .downsamplingRounds( List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueDays(10), - new DownsampleConfig(new DateHistogramInterval("2h")) - ), - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueDays(30), - new DownsampleConfig(new DateHistogramInterval("3h")) - ) + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(10), new DateHistogramInterval("2h")), + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(30), new DateHistogramInterval("3h")) ) ) .buildTemplate() @@ -166,7 +160,7 @@ public void testInvalidDownsamplingConfiguration() { { IllegalArgumentException exception = expectThrows( IllegalArgumentException.class, - () -> DataStreamLifecycle.dataLifecycleBuilder().downsampling((List.of())).buildTemplate() + () -> DataStreamLifecycle.dataLifecycleBuilder().downsamplingRounds((List.of())).buildTemplate() ); assertThat(exception.getMessage(), equalTo("Downsampling configuration should have at least one round configured.")); } @@ -174,13 +168,13 @@ public void testInvalidDownsamplingConfiguration() { IllegalArgumentException exception = expectThrows( IllegalArgumentException.class, () -> DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( + .downsamplingRounds( Stream.iterate(1, i -> i * 2) .limit(12) .map( i -> new DataStreamLifecycle.DownsamplingRound( TimeValue.timeValueDays(i), - new DownsampleConfig(new DateHistogramInterval(i + "h")) + new DateHistogramInterval(i + "h") ) ) .toList() @@ -194,13 +188,8 @@ public void testInvalidDownsamplingConfiguration() { IllegalArgumentException exception = expectThrows( IllegalArgumentException.class, () -> DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( - List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueDays(10), - new DownsampleConfig(new DateHistogramInterval("2m")) - ) - ) + .downsamplingRounds( + List.of(new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(10), new DateHistogramInterval("2m"))) ) .buildTemplate() ); @@ -209,10 +198,31 @@ public void testInvalidDownsamplingConfiguration() { equalTo("A downsampling round must have a fixed interval of at least five minutes but found: 2m") ); } + + { + IllegalArgumentException exception = expectThrows( + IllegalArgumentException.class, + () -> DataStreamLifecycle.dataLifecycleBuilder() + .downsamplingMethod(randomFrom(DownsampleConfig.SamplingMethod.values())) + .buildTemplate() + ); + assertThat( + exception.getMessage(), + equalTo("Downsampling method can only be set when there is at least one downsampling round.") + ); + } } public static DataStreamLifecycle.Template randomDataLifecycleTemplate() { - return DataStreamLifecycle.createDataLifecycleTemplate(randomBoolean(), randomRetention(), randomDownsampling()); + ResettableValue> downsamplingRounds = randomDownsamplingRounds(); + return DataStreamLifecycle.createDataLifecycleTemplate( + randomBoolean(), + randomRetention(), + downsamplingRounds, + downsamplingRounds.get() == null + ? randomBoolean() ? ResettableValue.undefined() : ResettableValue.reset() + : randomDownsamplingMethod() + ); } public void testInvalidLifecycleConfiguration() { @@ -222,13 +232,29 @@ public void testInvalidLifecycleConfiguration() { DataStreamLifecycle.LifecycleType.FAILURES, randomBoolean(), randomBoolean() ? null : DataStreamLifecycleTests.randomPositiveTimeValue(), - DataStreamLifecycleTests.randomDownsampling() + DataStreamLifecycleTests.randomDownsamplingRounds(), + null ) ); assertThat( exception.getMessage(), containsString("Failure store lifecycle does not support downsampling, please remove the downsampling configuration.") ); + + exception = expectThrows( + IllegalArgumentException.class, + () -> new DataStreamLifecycle.Template( + DataStreamLifecycle.LifecycleType.DATA, + randomBoolean(), + randomBoolean() ? null : DataStreamLifecycleTests.randomPositiveTimeValue(), + null, + randomFrom(DownsampleConfig.SamplingMethod.values()) + ) + ); + assertThat( + exception.getMessage(), + containsString("Downsampling method can only be set when there is at least one downsampling round.") + ); } /** @@ -240,6 +266,7 @@ public static DataStreamLifecycle.Template randomFailuresLifecycleTemplate() { DataStreamLifecycle.LifecycleType.FAILURES, randomBoolean(), randomRetention(), + ResettableValue.undefined(), ResettableValue.undefined() ); } @@ -253,10 +280,18 @@ private static ResettableValue randomRetention() { }; } - private static ResettableValue> randomDownsampling() { + private static ResettableValue> randomDownsamplingRounds() { + return switch (randomIntBetween(0, 1)) { + case 0 -> ResettableValue.reset(); + case 1 -> ResettableValue.create(DataStreamLifecycleTests.randomDownsamplingRounds()); + default -> throw new IllegalStateException("Unknown randomisation path"); + }; + } + + private static ResettableValue randomDownsamplingMethod() { return switch (randomIntBetween(0, 1)) { case 0 -> ResettableValue.reset(); - case 1 -> ResettableValue.create(DataStreamLifecycleTests.randomDownsampling()); + case 1 -> ResettableValue.create(DownsampleConfigTests.randomSamplingMethod()); default -> throw new IllegalStateException("Unknown randomisation path"); }; } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java index 198163645c83b..64bc13cb0dc98 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java @@ -12,7 +12,7 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverConditions; import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration; import org.elasticsearch.action.admin.indices.rollover.RolloverConfigurationTests; -import org.elasticsearch.action.downsample.DownsampleConfig; +import org.elasticsearch.action.downsample.DownsampleConfigTests; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.ClusterSettings; @@ -64,14 +64,16 @@ protected DataStreamLifecycle mutateInstance(DataStreamLifecycle instance) throw : DataStreamLifecycle.LifecycleType.DATA; var enabled = instance.enabled(); var retention = instance.dataRetention(); - var downsampling = instance.downsampling(); - switch (randomInt(3)) { + var downsamplingRounds = instance.downsamplingRounds(); + var downsamplingMethod = instance.downsamplingMethod(); + switch (randomInt(4)) { case 0 -> { if (instance.targetsFailureStore()) { lifecycleTarget = DataStreamLifecycle.LifecycleType.DATA; } else { lifecycleTarget = DataStreamLifecycle.LifecycleType.FAILURES; - downsampling = null; + downsamplingRounds = null; + downsamplingMethod = null; } } case 1 -> { @@ -82,20 +84,31 @@ protected DataStreamLifecycle mutateInstance(DataStreamLifecycle instance) throw } } case 2 -> { - if (downsampling == null) { - downsampling = randomDownsampling(); + if (downsamplingRounds == null) { + downsamplingRounds = randomDownsamplingRounds(); if (lifecycleTarget == DataStreamLifecycle.LifecycleType.FAILURES) { lifecycleTarget = DataStreamLifecycle.LifecycleType.DATA; } } else { - downsampling = randomBoolean() + downsamplingRounds = randomBoolean() ? null - : randomValueOtherThan(downsampling, DataStreamLifecycleTests::randomDownsampling); + : randomValueOtherThan(downsamplingRounds, DataStreamLifecycleTests::randomDownsamplingRounds); + if (downsamplingRounds == null) { + downsamplingMethod = null; + } + } + } + case 3 -> { + // We need to enable downsampling in order to add a non-value downsampling method + downsamplingMethod = randomValueOtherThan(downsamplingMethod, DownsampleConfigTests::randomSamplingMethod); + if (downsamplingMethod != null && downsamplingRounds == null) { + downsamplingRounds = randomDownsamplingRounds(); + lifecycleTarget = DataStreamLifecycle.LifecycleType.DATA; } } default -> enabled = enabled == false; } - return new DataStreamLifecycle(lifecycleTarget, enabled, retention, downsampling); + return new DataStreamLifecycle(lifecycleTarget, enabled, retention, downsamplingRounds, downsamplingMethod); } public void testDataLifecycleXContentSerialization() throws IOException { @@ -215,16 +228,10 @@ public void testInvalidDownsamplingConfiguration() { IllegalArgumentException exception = expectThrows( IllegalArgumentException.class, () -> DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( + .downsamplingRounds( List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueDays(10), - new DownsampleConfig(new DateHistogramInterval("2h")) - ), - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueDays(3), - new DownsampleConfig(new DateHistogramInterval("2h")) - ) + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(10), new DateHistogramInterval("2h")), + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(3), new DateHistogramInterval("2h")) ) ) .build() @@ -238,16 +245,10 @@ public void testInvalidDownsamplingConfiguration() { IllegalArgumentException exception = expectThrows( IllegalArgumentException.class, () -> DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( + .downsamplingRounds( List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueDays(10), - new DownsampleConfig(new DateHistogramInterval("2h")) - ), - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueDays(30), - new DownsampleConfig(new DateHistogramInterval("2h")) - ) + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(10), new DateHistogramInterval("2h")), + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(30), new DateHistogramInterval("2h")) ) ) .build() @@ -258,16 +259,10 @@ public void testInvalidDownsamplingConfiguration() { IllegalArgumentException exception = expectThrows( IllegalArgumentException.class, () -> DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( + .downsamplingRounds( List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueDays(10), - new DownsampleConfig(new DateHistogramInterval("2h")) - ), - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueDays(30), - new DownsampleConfig(new DateHistogramInterval("3h")) - ) + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(10), new DateHistogramInterval("2h")), + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(30), new DateHistogramInterval("3h")) ) ) .build() @@ -277,7 +272,7 @@ public void testInvalidDownsamplingConfiguration() { { IllegalArgumentException exception = expectThrows( IllegalArgumentException.class, - () -> DataStreamLifecycle.dataLifecycleBuilder().downsampling((List.of())).build() + () -> DataStreamLifecycle.dataLifecycleBuilder().downsamplingRounds((List.of())).build() ); assertThat(exception.getMessage(), equalTo("Downsampling configuration should have at least one round configured.")); } @@ -285,13 +280,13 @@ public void testInvalidDownsamplingConfiguration() { IllegalArgumentException exception = expectThrows( IllegalArgumentException.class, () -> DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( + .downsamplingRounds( Stream.iterate(1, i -> i * 2) .limit(12) .map( i -> new DataStreamLifecycle.DownsamplingRound( TimeValue.timeValueDays(i), - new DownsampleConfig(new DateHistogramInterval(i + "h")) + new DateHistogramInterval(i + "h") ) ) .toList() @@ -305,13 +300,8 @@ public void testInvalidDownsamplingConfiguration() { IllegalArgumentException exception = expectThrows( IllegalArgumentException.class, () -> DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( - List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueDays(10), - new DownsampleConfig(new DateHistogramInterval("2m")) - ) - ) + .downsamplingRounds( + List.of(new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(10), new DateHistogramInterval("2m"))) ) .build() ); @@ -326,7 +316,7 @@ public void testEffectiveRetention() { // No retention in the data stream lifecycle { DataStreamLifecycle noDataRetentionLifecycle = DataStreamLifecycle.dataLifecycleBuilder() - .downsampling(randomDownsampling()) + .downsamplingRounds(randomDownsamplingRounds()) .build(); DataStreamLifecycle noFailuresRetentionLifecycle = DataStreamLifecycle.failuresLifecycleBuilder().build(); TimeValue maxRetention = TimeValue.timeValueDays(randomIntBetween(50, 100)); @@ -381,7 +371,7 @@ public void testEffectiveRetention() { TimeValue dataStreamRetention = TimeValue.timeValueDays(randomIntBetween(5, 100)); DataStreamLifecycle dataLifecycleRetention = DataStreamLifecycle.dataLifecycleBuilder() .dataRetention(dataStreamRetention) - .downsampling(randomDownsampling()) + .downsamplingRounds(randomDownsamplingRounds()) .build(); DataStreamLifecycle failuresLifecycleRetention = DataStreamLifecycle.failuresLifecycleBuilder() .dataRetention(dataStreamRetention) @@ -510,9 +500,11 @@ public void testEffectiveRetentionParams() { } public static DataStreamLifecycle randomDataLifecycle() { + List downsamplingRounds = randomBoolean() ? null : randomDownsamplingRounds(); return DataStreamLifecycle.dataLifecycleBuilder() .dataRetention(randomBoolean() ? null : randomTimeValue(1, 365, TimeUnit.DAYS)) - .downsampling(randomBoolean() ? null : randomDownsampling()) + .downsamplingRounds(downsamplingRounds) + .downsamplingMethod(downsamplingRounds == null ? null : DownsampleConfigTests.randomSamplingMethod()) .enabled(randomBoolean()) .build(); } @@ -528,12 +520,12 @@ public static DataStreamLifecycle randomFailuresLifecycle() { .build(); } - static List randomDownsampling() { + static List randomDownsamplingRounds() { var count = randomIntBetween(0, 9); List rounds = new ArrayList<>(); var previous = new DataStreamLifecycle.DownsamplingRound( randomTimeValue(1, 365, TimeUnit.DAYS), - new DownsampleConfig(new DateHistogramInterval(randomIntBetween(1, 24) + "h")) + new DateHistogramInterval(randomIntBetween(1, 24) + "h") ); rounds.add(previous); for (int i = 0; i < count; i++) { @@ -546,9 +538,7 @@ static List randomDownsampling() { private static DataStreamLifecycle.DownsamplingRound nextRound(DataStreamLifecycle.DownsamplingRound previous) { var after = TimeValue.timeValueDays(previous.after().days() + randomIntBetween(1, 10)); - var fixedInterval = new DownsampleConfig( - new DateHistogramInterval((previous.config().getFixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms") - ); + var fixedInterval = new DateHistogramInterval((previous.fixedInterval().estimateMillis() * randomIntBetween(2, 5)) + "ms"); return new DataStreamLifecycle.DownsamplingRound(after, fixedInterval); } @@ -561,7 +551,8 @@ public void testInvalidLifecycleConfiguration() { DataStreamLifecycle.LifecycleType.FAILURES, null, null, - DataStreamLifecycleTests.randomDownsampling() + DataStreamLifecycleTests.randomDownsamplingRounds(), + DownsampleConfigTests.randomSamplingMethod() ) ) ); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleWithRetentionWarningsTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleWithRetentionWarningsTests.java index 3b7a28a894156..9e1a06126fa96 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleWithRetentionWarningsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleWithRetentionWarningsTests.java @@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import static org.elasticsearch.cluster.metadata.DataStreamLifecycleTests.randomDownsampling; +import static org.elasticsearch.cluster.metadata.DataStreamLifecycleTests.randomDownsamplingRounds; import static org.elasticsearch.common.settings.Settings.builder; import static org.elasticsearch.indices.ShardLimitValidatorTests.createTestShardLimitService; import static org.hamcrest.Matchers.containsString; @@ -60,7 +60,9 @@ public void testNoHeaderWarning() { ThreadContext threadContext = new ThreadContext(Settings.EMPTY); HeaderWarning.setThreadContext(threadContext); - DataStreamLifecycle noRetentionLifecycle = DataStreamLifecycle.dataLifecycleBuilder().downsampling(randomDownsampling()).build(); + DataStreamLifecycle noRetentionLifecycle = DataStreamLifecycle.dataLifecycleBuilder() + .downsamplingRounds(randomDownsamplingRounds()) + .build(); noRetentionLifecycle.addWarningHeaderIfDataRetentionNotEffective(null, randomBoolean()); Map> responseHeaders = threadContext.getResponseHeaders(); assertThat(responseHeaders.isEmpty(), is(true)); @@ -68,7 +70,7 @@ public void testNoHeaderWarning() { TimeValue dataStreamRetention = TimeValue.timeValueDays(randomIntBetween(5, 100)); DataStreamLifecycle lifecycleWithRetention = DataStreamLifecycle.dataLifecycleBuilder() .dataRetention(dataStreamRetention) - .downsampling(randomDownsampling()) + .downsamplingRounds(randomDownsamplingRounds()) .build(); DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention( TimeValue.timeValueDays(2), @@ -83,7 +85,9 @@ public void testDefaultRetentionHeaderWarning() { ThreadContext threadContext = new ThreadContext(Settings.EMPTY); HeaderWarning.setThreadContext(threadContext); - DataStreamLifecycle noRetentionLifecycle = DataStreamLifecycle.dataLifecycleBuilder().downsampling(randomDownsampling()).build(); + DataStreamLifecycle noRetentionLifecycle = DataStreamLifecycle.dataLifecycleBuilder() + .downsamplingRounds(randomDownsamplingRounds()) + .build(); DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention( randomTimeValue(2, 10, TimeUnit.DAYS), randomBoolean() ? null : TimeValue.timeValueDays(20) @@ -107,7 +111,7 @@ public void testMaxRetentionHeaderWarning() { TimeValue maxRetention = randomTimeValue(2, 100, TimeUnit.DAYS); DataStreamLifecycle lifecycle = DataStreamLifecycle.dataLifecycleBuilder() .dataRetention(randomBoolean() ? null : TimeValue.timeValueDays(maxRetention.days() + 1)) - .downsampling(randomDownsampling()) + .downsamplingRounds(randomDownsamplingRounds()) .build(); DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(null, maxRetention); lifecycle.addWarningHeaderIfDataRetentionNotEffective(globalRetention, false); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index c822b066629f8..e3a2147991ac4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration; import org.elasticsearch.action.admin.indices.rollover.RolloverConfigurationTests; import org.elasticsearch.action.admin.indices.rollover.RolloverInfo; -import org.elasticsearch.action.downsample.DownsampleConfig; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -1590,19 +1589,13 @@ public void testGetDownsampleRounds() { settings(IndexVersion.current()).put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES) .put("index.routing_path", "@timestamp"), DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( + .downsamplingRounds( List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueMillis(2000), - new DownsampleConfig(new DateHistogramInterval("10m")) - ), - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueMillis(3200), - new DownsampleConfig(new DateHistogramInterval("100m")) - ), + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueMillis(2000), new DateHistogramInterval("10m")), + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueMillis(3200), new DateHistogramInterval("100m")), new DataStreamLifecycle.DownsamplingRound( TimeValue.timeValueMillis(3500), - new DownsampleConfig(new DateHistogramInterval("1000m")) + new DateHistogramInterval("1000m") ) ) @@ -1648,20 +1641,11 @@ public void testGetDownsampleRounds() { // no TSDB settings settings(IndexVersion.current()), DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( + .downsamplingRounds( List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueMillis(2000), - new DownsampleConfig(new DateHistogramInterval("10m")) - ), - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueMillis(3200), - new DownsampleConfig(new DateHistogramInterval("100m")) - ), - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueMillis(3500), - new DownsampleConfig(new DateHistogramInterval("1000m")) - ) + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueMillis(2000), new DateHistogramInterval("10m")), + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueMillis(3200), new DateHistogramInterval("100m")), + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueMillis(3500), new DateHistogramInterval("1000m")) ) ) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateServiceTests.java index a94b92527bc53..4677ca1aed9b3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateServiceTests.java @@ -1104,26 +1104,40 @@ public void testResolveLifecycle() throws Exception { DataStreamLifecycle.Template lifecycle45d = DataStreamLifecycle.dataLifecycleBuilder() .dataRetention(TimeValue.timeValueDays(45)) - .downsampling( - List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueDays(30), - new DownsampleConfig(new DateHistogramInterval("3h")) - ) - ) + .downsamplingRounds( + List.of(new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(30), new DateHistogramInterval("3h"))) ) .buildTemplate(); String ct45d = "ct_45d"; project = addComponentTemplate(service, project, ct45d, lifecycle45d); + DataStreamLifecycle.Template lifecycle60d = DataStreamLifecycle.dataLifecycleBuilder() + .dataRetention(TimeValue.timeValueDays(60)) + .downsamplingRounds( + List.of(new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(7), new DateHistogramInterval("3h"))) + ) + .downsamplingMethod(DownsampleConfig.SamplingMethod.LAST_VALUE) + .buildTemplate(); + String ct60d = "ct_60d"; + project = addComponentTemplate(service, project, ct60d, lifecycle60d); DataStreamLifecycle.Template lifecycleNullRetention = DataStreamLifecycle.createDataLifecycleTemplate( true, ResettableValue.reset(), + ResettableValue.undefined(), ResettableValue.undefined() ); String ctNullRetention = "ct_null_retention"; project = addComponentTemplate(service, project, ctNullRetention, lifecycleNullRetention); + DataStreamLifecycle.Template lifecycleNullDownsampling = DataStreamLifecycle.createDataLifecycleTemplate( + true, + ResettableValue.undefined(), + ResettableValue.reset(), + ResettableValue.reset() + ); + String ctNullDownsampling = "ct_null_downsampling"; + project = addComponentTemplate(service, project, ctNullDownsampling, lifecycleNullDownsampling); + String ctEmptyLifecycle = "ct_empty_lifecycle"; project = addComponentTemplate(service, project, ctEmptyLifecycle, emptyLifecycle); @@ -1167,7 +1181,7 @@ public void testResolveLifecycle() throws Exception { lifecycle30d, DataStreamLifecycle.dataLifecycleBuilder() .dataRetention(lifecycle30d.dataRetention()) - .downsampling(lifecycle45d.downsampling()) + .downsamplingRounds(lifecycle45d.downsamplingRounds()) .buildTemplate() ); @@ -1189,7 +1203,7 @@ public void testResolveLifecycle() throws Exception { project, List.of(ctEmptyLifecycle, ct45d), lifecycleNullRetention, - DataStreamLifecycle.dataLifecycleBuilder().downsampling(lifecycle45d.downsampling()).buildTemplate() + DataStreamLifecycle.dataLifecycleBuilder().downsamplingRounds(lifecycle45d.downsamplingRounds()).buildTemplate() ); // Component A: "lifecycle": {"retention": "30d"} @@ -1203,7 +1217,7 @@ public void testResolveLifecycle() throws Exception { DataStreamLifecycle.dataLifecycleBuilder().enabled(false).buildTemplate(), DataStreamLifecycle.dataLifecycleBuilder() .dataRetention(lifecycle45d.dataRetention()) - .downsampling(lifecycle45d.downsampling()) + .downsamplingRounds(lifecycle45d.downsamplingRounds()) .enabled(false) .buildTemplate() ); @@ -1225,6 +1239,72 @@ public void testResolveLifecycle() throws Exception { // Composable Z: "lifecycle": {"retention": "45d", "downsampling": [{"after": "30d", "fixed_interval": "3h"}]} // Result: "lifecycle": {"retention": "45d", "downsampling": [{"after": "30d", "fixed_interval": "3h"}]} assertLifecycleResolution(service, project, List.of(ct30d, ctDisabledLifecycle), lifecycle45d, lifecycle45d); + + // Component A: "lifecycle": { + // "retention": "60d", + // "downsampling_method": "last_value", + // "downsampling": [{"after": "3d", "fixed_interval": "3h"}] + // } + // Composable Z: "lifecycle": {"retention": "45d", "downsampling": [{"after": "30d", "fixed_interval": "3h"}]} + // Result: "lifecycle": { + // "retention": "45d", + // "downsampling": [{"after": "30d", "fixed_interval": "3h"}], + // "downsampling_method": "last_value" + // } + assertLifecycleResolution( + service, + project, + List.of(ct60d), + lifecycle45d, + DataStreamLifecycle.dataLifecycleBuilder() + .dataRetention(lifecycle45d.dataRetention()) + .downsamplingMethod(lifecycle60d.downsamplingMethod()) + .downsamplingRounds(lifecycle45d.downsamplingRounds()) + .buildTemplate() + ); + + // Component A: "lifecycle": { + // "retention": "60d", + // "downsampling_method": "last_value", + // "downsampling": [{"after": "3d", "fixed_interval": "3h"}] + // } + // Component B: "lifecycle": {"retention": "45d", "downsampling": [{"after": "30d", "fixed_interval": "3h"}]} + // Composable Z: "lifecycle": {"downsampling": null, "downsampling_method": null} + // Result: "lifecycle": {"retention": "45d"} + assertLifecycleResolution( + service, + project, + List.of(ct60d, ct45d), + lifecycleNullDownsampling, + DataStreamLifecycle.dataLifecycleBuilder().dataRetention(lifecycle45d.dataRetention()).buildTemplate() + ); + + // Component A: "lifecycle": { + // "retention": "60d", + // "downsampling_method": "last_value", + // "downsampling": [{"after": "3d", "fixed_interval": "3h"}] + // } + // Composable Z: "lifecycle": {"retention": "45d", "downsampling": [{"after": "30d", "fixed_interval": "3h"}]} + // Result: "lifecycle": { + // "retention": "45d", + // "downsampling": [{"after": "30d", "fixed_interval": "3h"}], + // "downsampling_method": "last_value" + // } + assertLifecycleResolution( + service, + project, + List.of(ct60d), + DataStreamLifecycle.createDataLifecycleTemplate( + true, + ResettableValue.undefined(), + ResettableValue.undefined(), + ResettableValue.reset() + ), + DataStreamLifecycle.dataLifecycleBuilder() + .dataRetention(lifecycle60d.dataRetention()) + .downsamplingRounds(lifecycle60d.downsamplingRounds()) + .buildTemplate() + ); } public void testResolveFailureStore() throws Exception { diff --git a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionIT.java b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionIT.java index f9c326c1ac854..8330f7c92a8f9 100644 --- a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionIT.java +++ b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionIT.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.core.action; -import org.elasticsearch.action.downsample.DownsampleConfig; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -164,7 +163,7 @@ public void testAction() throws Exception { timeSeriesDataStreamCount.incrementAndGet(); if (downsamplingConfiguredBy == DownsampledBy.DLM) { dlmDownsampledDataStreamCount.incrementAndGet(); - updateRounds(lifecycle.downsampling().size(), dlmRoundsCount, dlmRoundsSum, dlmRoundsMin, dlmRoundsMax); + updateRounds(lifecycle.downsamplingRounds().size(), dlmRoundsCount, dlmRoundsSum, dlmRoundsMin, dlmRoundsMax); } else if (downsamplingConfiguredBy == DownsampledBy.ILM) { ilmDownsampledDataStreamCount.incrementAndGet(); } @@ -410,7 +409,7 @@ private DataStreamLifecycle maybeCreateLifecycle(boolean isDownsampled, boolean } var builder = DataStreamLifecycle.dataLifecycleBuilder(); if (isDownsampled) { - builder.downsampling(randomDownsamplingRounds()); + builder.downsamplingRounds(randomDownsamplingRounds()); } return builder.build(); } @@ -471,12 +470,7 @@ private List randomDownsamplingRounds() { int minutes = 5; int days = 1; for (int i = 0; i < randomIntBetween(1, 10); i++) { - rounds.add( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueDays(days), - new DownsampleConfig(new DateHistogramInterval(minutes + "m")) - ) - ); + rounds.add(new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueDays(days), new DateHistogramInterval(minutes + "m"))); minutes *= randomIntBetween(2, 5); days += randomIntBetween(1, 5); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportAction.java index 77245a396e5db..5e980387bc4f8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportAction.java @@ -89,9 +89,9 @@ protected void localClusterStateOperation( continue; } tsDataStreamCount++; - Integer dlmRounds = ds.getDataLifecycle() == null || ds.getDataLifecycle().downsampling() == null + Integer dlmRounds = ds.getDataLifecycle() == null || ds.getDataLifecycle().downsamplingRounds() == null ? null - : ds.getDataLifecycle().downsampling().size(); + : ds.getDataLifecycle().downsamplingRounds().size(); for (Index backingIndex : ds.getIndices()) { IndexMetadata indexMetadata = projectMetadata.index(backingIndex); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datastreams/DataStreamLifecycleFeatureSetUsageTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datastreams/DataStreamLifecycleFeatureSetUsageTests.java index fa4e7ead7eaf9..1516353731b56 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datastreams/DataStreamLifecycleFeatureSetUsageTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datastreams/DataStreamLifecycleFeatureSetUsageTests.java @@ -117,7 +117,7 @@ public void testLifecycleStats() { 1L, null, false, - DataStreamLifecycle.createDataLifecycle(true, TimeValue.timeValueSeconds(50), null) + DataStreamLifecycle.createDataLifecycle(true, TimeValue.timeValueSeconds(50), null, null) ), DataStreamTestHelper.newInstance( randomAlphaOfLength(10), @@ -125,7 +125,7 @@ public void testLifecycleStats() { 1L, null, false, - DataStreamLifecycle.createDataLifecycle(true, TimeValue.timeValueMillis(150), null) + DataStreamLifecycle.createDataLifecycle(true, TimeValue.timeValueMillis(150), null, null) ), DataStreamTestHelper.newInstance( randomAlphaOfLength(10), @@ -133,7 +133,7 @@ public void testLifecycleStats() { 1L, null, false, - DataStreamLifecycle.createDataLifecycle(false, TimeValue.timeValueSeconds(5), null) + DataStreamLifecycle.createDataLifecycle(false, TimeValue.timeValueSeconds(5), null, null) ), DataStreamTestHelper.newInstance( randomAlphaOfLength(10), diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java index b07a00c13903c..8fff5bb52a75f 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.rollover.RolloverAction; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; -import org.elasticsearch.action.downsample.DownsampleConfig; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; @@ -52,13 +51,8 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception { final String dataStreamName = "metrics-foo"; DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( - List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueMillis(0), - new DownsampleConfig(new DateHistogramInterval("5m")) - ) - ) + .downsamplingRounds( + List.of(new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueMillis(0), new DateHistogramInterval("5m"))) ) .buildTemplate(); setupTSDBDataStreamAndIngestDocs( diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleIT.java index 114bdeeff7e98..7cfbdef90a2bd 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleIT.java @@ -9,13 +9,19 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverAction; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.admin.indices.rollover.RolloverResponse; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction; import org.elasticsearch.action.downsample.DownsampleConfig; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.junit.annotations.TestLogging; import java.util.HashSet; @@ -26,6 +32,7 @@ import static org.elasticsearch.cluster.metadata.ClusterChangedEventUtils.indicesCreated; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; public class DataStreamLifecycleDownsampleIT extends DownsamplingIntegTestCase { @@ -42,17 +49,13 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { public void testDownsampling() throws Exception { String dataStreamName = "metrics-foo"; + DownsampleConfig.SamplingMethod downsamplingMethod = randomSamplingMethod(); DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( + .downsamplingMethod(downsamplingMethod) + .downsamplingRounds( List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueMillis(0), - new DownsampleConfig(new DateHistogramInterval("5m")) - ), - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueSeconds(10), - new DownsampleConfig(new DateHistogramInterval("10m")) - ) + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueMillis(0), new DateHistogramInterval("5m")), + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueSeconds(10), new DateHistogramInterval("10m")) ) ) .buildTemplate(); @@ -68,34 +71,34 @@ public void testDownsampling() throws Exception { List backingIndices = getDataStreamBackingIndexNames(dataStreamName); String firstGenerationBackingIndex = backingIndices.get(0); - String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex; - String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex; + String fiveMinuteDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex; + String tenMinuteDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex; Set witnessedDownsamplingIndices = new HashSet<>(); clusterService().addListener(event -> { - if (indicesCreated(event).contains(oneSecondDownsampleIndex) - || event.indicesDeleted().stream().anyMatch(index -> index.getName().equals(oneSecondDownsampleIndex))) { - witnessedDownsamplingIndices.add(oneSecondDownsampleIndex); + if (indicesCreated(event).contains(fiveMinuteDownsampleIndex) + || event.indicesDeleted().stream().anyMatch(index -> index.getName().equals(fiveMinuteDownsampleIndex))) { + witnessedDownsamplingIndices.add(fiveMinuteDownsampleIndex); } - if (indicesCreated(event).contains(tenSecondsDownsampleIndex)) { - witnessedDownsamplingIndices.add(tenSecondsDownsampleIndex); + if (indicesCreated(event).contains(tenMinuteDownsampleIndex)) { + witnessedDownsamplingIndices.add(tenMinuteDownsampleIndex); } }); // before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with - // testing so DSL doesn't have to wait for the end_time to lapse) + // testing, so DSL doesn't have to wait for the end_time to lapse) putTSDBIndexTemplate(dataStreamName, null, null, lifecycle); client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet(); assertBusy(() -> { // first downsampling round - assertThat(witnessedDownsamplingIndices.contains(oneSecondDownsampleIndex), is(true)); + assertThat(witnessedDownsamplingIndices.contains(fiveMinuteDownsampleIndex), is(true)); }, 30, TimeUnit.SECONDS); assertBusy(() -> { assertThat(witnessedDownsamplingIndices.size(), is(2)); - assertThat(witnessedDownsamplingIndices.contains(oneSecondDownsampleIndex), is(true)); - assertThat(witnessedDownsamplingIndices.contains(tenSecondsDownsampleIndex), is(true)); + assertThat(witnessedDownsamplingIndices.contains(fiveMinuteDownsampleIndex), is(true)); + assertThat(witnessedDownsamplingIndices.contains(tenMinuteDownsampleIndex), is(true)); }, 30, TimeUnit.SECONDS); assertBusy(() -> { @@ -105,28 +108,25 @@ public void testDownsampling() throws Exception { String writeIndex = dsBackingIndices.get(1); assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); // the last downsampling round must remain in the data stream - assertThat(dsBackingIndices.get(0), is(tenSecondsDownsampleIndex)); - assertThat(indexExists(oneSecondDownsampleIndex), is(false)); + assertThat(dsBackingIndices.get(0), is(tenMinuteDownsampleIndex)); + assertThat(indexExists(fiveMinuteDownsampleIndex), is(false)); }, 30, TimeUnit.SECONDS); + assertDownsamplingMethod(downsamplingMethod, tenMinuteDownsampleIndex); } @TestLogging(value = "org.elasticsearch.datastreams.lifecycle:TRACE", reason = "debugging") public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception { String dataStreamName = "metrics-bar"; + DownsampleConfig.SamplingMethod downsamplingMethod = randomSamplingMethod(); DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( + .downsamplingMethod(downsamplingMethod) + .downsamplingRounds( List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueMillis(0), - new DownsampleConfig(new DateHistogramInterval("5m")) - ), + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueMillis(0), new DateHistogramInterval("5m")), // data stream lifecycle runs every 1 second, so by the time we forcemerge the backing index it would've been at // least 2 seconds since rollover. only the 10 seconds round should be executed. - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueMillis(10), - new DownsampleConfig(new DateHistogramInterval("10m")) - ) + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueMillis(10), new DateHistogramInterval("10m")) ) ) .buildTemplate(); @@ -141,28 +141,28 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception List backingIndices = getDataStreamBackingIndexNames(dataStreamName); String firstGenerationBackingIndex = backingIndices.get(0); - String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex; - String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex; + String fiveMinuteDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex; + String tenMinuteDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex; Set witnessedDownsamplingIndices = new HashSet<>(); clusterService().addListener(event -> { - if (indicesCreated(event).contains(oneSecondDownsampleIndex) - || event.indicesDeleted().stream().anyMatch(index -> index.getName().equals(oneSecondDownsampleIndex))) { - witnessedDownsamplingIndices.add(oneSecondDownsampleIndex); + if (indicesCreated(event).contains(fiveMinuteDownsampleIndex) + || event.indicesDeleted().stream().anyMatch(index -> index.getName().equals(fiveMinuteDownsampleIndex))) { + witnessedDownsamplingIndices.add(fiveMinuteDownsampleIndex); } - if (indicesCreated(event).contains(tenSecondsDownsampleIndex)) { - witnessedDownsamplingIndices.add(tenSecondsDownsampleIndex); + if (indicesCreated(event).contains(tenMinuteDownsampleIndex)) { + witnessedDownsamplingIndices.add(tenMinuteDownsampleIndex); } }); // before we rollover we update the index template to remove the start/end time boundaries (they're there just to ease with - // testing so DSL doesn't have to wait for the end_time to lapse) + // testing, so DSL doesn't have to wait for the end_time to lapse) putTSDBIndexTemplate(dataStreamName, null, null, lifecycle); client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet(); assertBusy(() -> { assertThat(witnessedDownsamplingIndices.size(), is(1)); // only the ten seconds downsample round should've been executed - assertThat(witnessedDownsamplingIndices.contains(tenSecondsDownsampleIndex), is(true)); + assertThat(witnessedDownsamplingIndices.contains(tenMinuteDownsampleIndex), is(true)); }, 30, TimeUnit.SECONDS); assertBusy(() -> { @@ -171,8 +171,9 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception assertThat(dsBackingIndices.size(), is(2)); String writeIndex = dsBackingIndices.get(1); assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); - assertThat(dsBackingIndices.get(0), is(tenSecondsDownsampleIndex)); + assertThat(dsBackingIndices.get(0), is(tenMinuteDownsampleIndex)); }, 30, TimeUnit.SECONDS); + assertDownsamplingMethod(downsamplingMethod, tenMinuteDownsampleIndex); } @TestLogging(value = "org.elasticsearch.datastreams.lifecycle:TRACE", reason = "debugging") @@ -181,19 +182,15 @@ public void testUpdateDownsampleRound() throws Exception { // we expect the earlier round to be ignored String dataStreamName = "metrics-baz"; + DownsampleConfig.SamplingMethod downsamplingMethod = randomSamplingMethod(); DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( + .downsamplingMethod(downsamplingMethod) + .downsamplingRounds( List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueMillis(0), - new DownsampleConfig(new DateHistogramInterval("5m")) - ), + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueMillis(0), new DateHistogramInterval("5m")), // data stream lifecycle runs every 1 second, so by the time we forcemerge the backing index it would've been at - // least 2 seconds since rollover. only the 10 seconds round should be executed. - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueMillis(10), - new DownsampleConfig(new DateHistogramInterval("10m")) - ) + // least 2 seconds since rollover. Only the 10 seconds round should be executed. + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueMillis(10), new DateHistogramInterval("10m")) ) ) .buildTemplate(); @@ -243,15 +240,10 @@ public void testUpdateDownsampleRound() throws Exception { // update the lifecycle so that it only has one round, for the same `after` parameter as before, but a different interval // the different interval should yield a different downsample index name so we expect the data stream lifecycle to get the previous - // `10s` interval downsample index, downsample it to `20m` and replace it in the data stream instead of the `10s` one. + // `10m` interval downsample index, downsample it to `20m` and replace it in the data stream instead of the `10m` one. DataStreamLifecycle updatedLifecycle = DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( - List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueMillis(10), - new DownsampleConfig(new DateHistogramInterval("20m")) - ) - ) + .downsamplingRounds( + List.of(new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueMillis(10), new DateHistogramInterval("20m"))) ) .build(); assertAcked( @@ -277,5 +269,111 @@ public void testUpdateDownsampleRound() throws Exception { assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); assertThat(dsBackingIndices.get(0), is(thirtySecondsDownsampleIndex)); }, 30, TimeUnit.SECONDS); + assertDownsamplingMethod(downsamplingMethod, thirtySecondsDownsampleIndex); + } + + /** + * This test ensures that when we change the sampling method, the already downsampled indices will use the original sampling method, + * while the raw data ones will be downsampled with the most recent configuration. + * To achieve that, we set the following test: + * 1. Create a data stream that is downsampled with a sampling method. + * 2. Rollover and wait for the downsampling to occur + * 3. Double the downsample interval (so it can downsample the first index as well) and change the sampling method. + * 4. Rollover and wait for both indices to be downsampled with the new interval + * 5. Check that the indices have been downsampled with the correct method. + */ + public void testUpdateDownsampleSamplingMode() throws Exception { + String dataStreamName = "metrics-baz"; + DownsampleConfig.SamplingMethod initialSamplingMethod = randomBoolean() + ? null + : randomFrom(DownsampleConfig.SamplingMethod.values()); + DownsampleConfig.SamplingMethod updatedSamplingMethod = initialSamplingMethod == DownsampleConfig.SamplingMethod.LAST_VALUE + ? (randomBoolean() ? null : DownsampleConfig.SamplingMethod.AGGREGATE) + : DownsampleConfig.SamplingMethod.LAST_VALUE; + + DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.dataLifecycleBuilder() + .downsamplingMethod(initialSamplingMethod) + .downsamplingRounds( + List.of(new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueMillis(10), new DateHistogramInterval("5m"))) + ) + .buildTemplate(); + + // Start and end time there just to ease with testing, so DLM doesn't have to wait for the end_time to lapse + // Creating the first backing index. + setupTSDBDataStreamAndIngestDocs( + dataStreamName, + "1986-01-08T23:40:53.384Z", + "2022-01-08T23:40:53.384Z", + lifecycle, + DOC_COUNT, + "1990-09-09T18:00:00" + ); + + // before we roll over, we update the index template to have new start/end time boundaries + // Creating the second backing index. + putTSDBIndexTemplate(dataStreamName, "2022-01-08T23:40:53.384Z", "2023-01-08T23:40:53.384Z", lifecycle); + RolloverResponse rolloverResponse = safeGet(client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null))); + assertTrue(rolloverResponse.isRolledOver()); + String firstBackingIndex = rolloverResponse.getOldIndex(); + String secondBackingIndex = rolloverResponse.getNewIndex(); + indexDocuments(dataStreamName, randomIntBetween(1, 1000), "2022-01-08T23:50:00"); + + // Ensure that the first backing index has been downsampled + awaitClusterState(clusterState -> { + final var dataStream = clusterState.metadata().getProject().dataStreams().get(dataStreamName); + if (dataStream == null) { + return false; + } + return dataStream.getIndices().size() > 1 && dataStream.getIndices().getFirst().getName().startsWith("downsample-"); + }); + assertDownsamplingMethod(initialSamplingMethod, "downsample-5m-" + firstBackingIndex); + // We change the sampling method, but also we double the downsampling interval. We expect the data stream lifecycle to get the + // previous `5m` interval downsampled index, downsample it to `10m` and replace it in the data stream with the `5m` one. + DataStreamLifecycle updatedLifecycle = DataStreamLifecycle.dataLifecycleBuilder() + .downsamplingMethod(updatedSamplingMethod) + .downsamplingRounds( + List.of(new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueMillis(5), new DateHistogramInterval("10m"))) + ) + .build(); + assertAcked( + client().execute( + PutDataStreamLifecycleAction.INSTANCE, + new PutDataStreamLifecycleAction.Request( + TEST_REQUEST_TIMEOUT, + TEST_REQUEST_TIMEOUT, + new String[] { dataStreamName }, + updatedLifecycle + ) + ) + ); + + // We roll over one more time, so the second backing index will be eligible for downsampling + putTSDBIndexTemplate(dataStreamName, null, null, lifecycle); + rolloverResponse = safeGet(client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null))); + assertTrue(rolloverResponse.isRolledOver()); + String downsampledPrefix = "downsample-10m-"; + final var waitForUpdatedDownsamplingRound = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { + ProjectMetadata projectMetadata = clusterState.metadata().getProject(); + final var dataStream = projectMetadata.dataStreams().get(dataStreamName); + if (dataStream == null) { + return false; + } + + return dataStream.getIndices().size() > 2 + && dataStream.getIndices().stream().filter(index -> index.getName().startsWith(downsampledPrefix)).count() == 2; + }); + safeAwait(waitForUpdatedDownsamplingRound); + assertDownsamplingMethod(initialSamplingMethod, downsampledPrefix + firstBackingIndex); + assertDownsamplingMethod(updatedSamplingMethod, downsampledPrefix + secondBackingIndex); + } + + private void assertDownsamplingMethod(DownsampleConfig.SamplingMethod downsamplingMethod, String... indexNames) { + String expected = DownsampleConfig.SamplingMethod.getOrDefault(downsamplingMethod).toString(); + GetSettingsResponse response = safeGet( + client().admin().indices().getSettings(new GetSettingsRequest(TimeValue.THIRTY_SECONDS).indices(indexNames)) + ); + for (String indexName : indexNames) { + assertThat(response.getSetting(indexName, IndexMetadata.INDEX_DOWNSAMPLE_METHOD_KEY), equalTo(expected)); + } } } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index 8a3758cd57249..afb38c99aa06a 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -863,15 +863,16 @@ private static void validateDownsamplingConfiguration( Map meta = timestampFieldType.meta(); if (meta.isEmpty() == false) { - String interval = meta.get(config.getIntervalType()); - DownsampleConfig.SamplingMethod sourceSamplingMethod = DownsampleConfig.SamplingMethod.fromIndexMetadata(sourceIndexMetadata); - if (interval != null) { + String sourceInterval = meta.get(config.getIntervalType()); + if (sourceInterval != null) { try { - DownsampleConfig sourceConfig = new DownsampleConfig(new DateHistogramInterval(interval), sourceSamplingMethod); - DownsampleConfig.validateSourceAndTargetIntervals(sourceConfig, config); + DownsampleConfig.validateSourceAndTargetIntervals(new DateHistogramInterval(sourceInterval), config.getFixedInterval()); } catch (IllegalArgumentException exception) { e.addValidationError("Source index is a downsampled index. " + exception.getMessage()); } + DownsampleConfig.SamplingMethod sourceSamplingMethod = DownsampleConfig.SamplingMethod.fromIndexMetadata( + sourceIndexMetadata + ); if (Objects.equals(sourceSamplingMethod, config.getSamplingMethodOrDefault()) == false) { e.addValidationError( "Source index is a downsampled index. Downsampling method [" diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java index 8f0254cdf586b..40bcfdac5b00a 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java @@ -157,7 +157,7 @@ public void testGetStatelessAssignment() { .build(); var params = new DownsampleShardTaskParams( - new DownsampleConfig(new DateHistogramInterval("1h")), + new DownsampleConfig(new DateHistogramInterval("1h"), randomSamplingMethod()), shardId.getIndexName(), 1, 1, diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java index 9def00c964a7d..c37cd36dacb64 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java @@ -121,16 +121,11 @@ public void testDownsamplingAuthorized() throws Exception { String dataStreamName = "metrics-foo"; DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( + .downsamplingMethod(randomSamplingMethod()) + .downsamplingRounds( List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueMillis(0), - new DownsampleConfig(new DateHistogramInterval("5m")) - ), - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueSeconds(10), - new DownsampleConfig(new DateHistogramInterval("10m")) - ) + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueMillis(0), new DateHistogramInterval("5m")), + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueSeconds(10), new DateHistogramInterval("10m")) ) ) .buildTemplate(); @@ -408,19 +403,22 @@ private void bulkIndex(Client client, String dataStreamName, Supplier Indexed [{}] documents. Dropped [{}] duplicates.", docsIndexed, duplicates); } + private static DownsampleConfig.SamplingMethod randomSamplingMethod() { + if (between(0, DownsampleConfig.SamplingMethod.values().length) == 0) { + return null; + } else { + return randomFrom(DownsampleConfig.SamplingMethod.values()); + } + } + public static class SystemDataStreamWithDownsamplingConfigurationPlugin extends Plugin implements SystemIndexPlugin { public static final DataStreamLifecycle.Template LIFECYCLE = DataStreamLifecycle.dataLifecycleBuilder() - .downsampling( + .downsamplingMethod(randomSamplingMethod()) + .downsamplingRounds( List.of( - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueMillis(0), - new DownsampleConfig(new DateHistogramInterval("5m")) - ), - new DataStreamLifecycle.DownsamplingRound( - TimeValue.timeValueSeconds(10), - new DownsampleConfig(new DateHistogramInterval("10m")) - ) + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueMillis(0), new DateHistogramInterval("5m")), + new DataStreamLifecycle.DownsamplingRound(TimeValue.timeValueSeconds(10), new DateHistogramInterval("10m")) ) ) .buildTemplate();