Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.indices;

import org.opensearch.core.common.bytes.BytesReference;

import java.io.IOException;

public interface CacheTierPolicy<T> {
/**
* Determines whether this policy allows the data into its cache tier, based on the contents of the BytesReference
* which can be deserialized into class T.
* @param data A BytesReference which can be deserialized into class T
* @return A CheckDataResult object containing whether the data is admitted, and if it isn't, the reason.
* @throws IOException if the input can't be deserialized to the right class.
*/
boolean checkData(BytesReference data) throws IOException;

/**
* Convert the BytesReference into the type T that is used to check entry into the cache.
* @param data The BytesReference
* @return The BytesReference converted to type T
* @throws IOException if the input can't be deserialized to the right class.
*/
T convertFromBytesReference(BytesReference data) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.indices;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.search.query.QuerySearchResult;

import java.io.IOException;

/**
* A cache tier policy which accepts queries whose took time is greater than some threshold,
* which is specified as a dynamic cluster-level setting. The threshold should be set to approximately
* the time it takes to get a result from the cache tier.
*/
public class DiskTierTookTimePolicy implements CacheTierPolicy<QuerySearchResult> {
public static final Setting<TimeValue> INDICES_REQUEST_CACHE_DISK_TOOKTIME_THRESHOLD_SETTING = Setting.positiveTimeSetting(
"index.requests.cache.disk.tooktime.threshold",
new TimeValue(10),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private TimeValue threshold;

public DiskTierTookTimePolicy(Settings settings, ClusterSettings clusterSettings) {
this.threshold = INDICES_REQUEST_CACHE_DISK_TOOKTIME_THRESHOLD_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(INDICES_REQUEST_CACHE_DISK_TOOKTIME_THRESHOLD_SETTING, this::setThreshold);
}

protected void setThreshold(TimeValue threshold) { // public so that we can manually set value in unit test
this.threshold = threshold;
}

@Override
public QuerySearchResult convertFromBytesReference(BytesReference data) throws IOException {
try {
return new QuerySearchResult(data.streamInput());
} catch (IllegalStateException ise) {
throw new IOException(ise);
}
}

@Override
public boolean checkData(BytesReference data) throws IOException {
QuerySearchResult qsr = convertFromBytesReference(data);
Long tookTimeNanos = qsr.getTookTimeNanos();
if (tookTimeNanos == null) {
return true;
// Received a null took time -> this QSR is from an old version which does not have took time, we should accept it
}
TimeValue tookTime = TimeValue.timeValueNanos(qsr.getTookTimeNanos());
if (tookTime.compareTo(threshold) < 0) { // negative -> tookTime is shorter than threshold
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.indices.IndicesRequestCacheTookTimePolicy;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.ShardLimitValidator;
import org.opensearch.indices.analysis.HunspellService;
Expand Down Expand Up @@ -671,7 +672,9 @@ public void apply(Settings value, Settings current, Settings previous) {
// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,

IndicesRequestCacheTookTimePolicy.INDICES_REQUEST_CACHE_DISK_TOOKTIME_THRESHOLD_SETTING
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,15 @@ public void preProcess(SearchContext context) {
}

public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
long startTime = System.nanoTime();
if (searchContext.hasOnlySuggest()) {
suggestProcessor.process(searchContext);
searchContext.queryResult()
.topDocs(
new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),
new DocValueFormat[0]
);
searchContext.queryResult().setTookTimeNanos(System.nanoTime() - startTime);
return;
}

Expand Down Expand Up @@ -165,6 +167,7 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep
);
searchContext.queryResult().profileResults(shardResults);
}
searchContext.queryResult().setTookTimeNanos(System.nanoTime() - startTime);
}

// making public for testing
Expand Down Expand Up @@ -292,7 +295,6 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
}

return shouldRescore;
} finally {
// Search phase has finished, no longer need to check for timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public final class QuerySearchResult extends SearchPhaseResult {
private int nodeQueueSize = -1;

private final boolean isNull;
private long tookTimeNanos;

public QuerySearchResult() {
this(false);
Expand Down Expand Up @@ -364,6 +365,7 @@ public void readFromWithId(ShardSearchContextId id, StreamInput in) throws IOExc
nodeQueueSize = in.readInt();
setShardSearchRequest(in.readOptionalWriteable(ShardSearchRequest::new));
setRescoreDocIds(new RescoreDocIds(in));
tookTimeNanos = in.readVLong();
}

@Override
Expand Down Expand Up @@ -406,6 +408,7 @@ public void writeToNoId(StreamOutput out) throws IOException {
out.writeInt(nodeQueueSize);
out.writeOptionalWriteable(getShardSearchRequest());
getRescoreDocIds().writeTo(out);
out.writeVLong(tookTimeNanos); // VLong as took time should always be positive
}

public TotalHits getTotalHits() {
Expand All @@ -415,4 +418,12 @@ public TotalHits getTotalHits() {
public float getMaxScore() {
return maxScore;
}

public long getTookTimeNanos() {
return tookTimeNanos;
}

public void setTookTimeNanos(long tookTime) {
tookTimeNanos = tookTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.indices;

import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.OriginalIndicesTests;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.common.UUIDs;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.search.internal.AliasFilter;
import org.opensearch.search.internal.ShardSearchContextId;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;

public class IndicesRequestCacheDiskTierPolicyTests extends OpenSearchTestCase {
private DiskTierTookTimePolicy getTookTimePolicy() {
// dummy settings
Settings dummySettings = Settings.EMPTY;
ClusterSettings dummyClusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
return new DiskTierTookTimePolicy(dummySettings, dummyClusterSettings);
}

public void testQSRSetupFunction() throws IOException {
long ttn = 100000000000L;
BytesReference qsrBytes = getQSRBytesReference(ttn);
QuerySearchResult qsr = new QuerySearchResult(qsrBytes.streamInput());
assertEquals(ttn, qsr.getTookTimeNanos());
}

public void testBadBytesReference() throws Exception {
BytesReference badQSR = new BytesArray("I love bytes!!!");
DiskTierTookTimePolicy tookTimePolicy = getTookTimePolicy();
assertThrows(IOException.class, () -> tookTimePolicy.checkData(badQSR));
}

public void testTookTimePolicy() throws Exception {
DiskTierTookTimePolicy tookTimePolicy = getTookTimePolicy();

// manually set threshold for test
double threshMillis = 10;
long shortMillis = (long) (0.9 * threshMillis);
long longMillis = (long) (1.5 * threshMillis);
tookTimePolicy.setThreshold(new TimeValue((long) threshMillis));
BytesReference shortQSR = getQSRBytesReference(shortMillis * 1000000);
BytesReference longQSR = getQSRBytesReference(longMillis * 1000000);

boolean shortResult = tookTimePolicy.checkData(shortQSR);
assertFalse(shortResult);
boolean longResult = tookTimePolicy.checkData(longQSR);
assertTrue(longResult);
}

private BytesReference getQSRBytesReference(long tookTimeNanos) throws IOException {
// setup from QuerySearchResultTests.java
ShardId shardId = new ShardId("index", "uuid", randomInt());
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(randomBoolean());
ShardSearchRequest shardSearchRequest = new ShardSearchRequest(
OriginalIndicesTests.randomOriginalIndices(),
searchRequest,
shardId,
1,
new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f,
randomNonNegativeLong(),
null,
new String[0]
);
ShardSearchContextId id = new ShardSearchContextId(UUIDs.base64UUID(), randomLong());
QuerySearchResult result = new QuerySearchResult(
id,
new SearchShardTarget("node", shardId, null, OriginalIndices.NONE),
shardSearchRequest
);
TopDocs topDocs = new TopDocs(new TotalHits(randomLongBetween(0, Long.MAX_VALUE), TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]);
result.topDocs(new TopDocsAndMaxScore(topDocs, randomBoolean() ? Float.NaN : randomFloat()), new DocValueFormat[0]);

result.setTookTimeNanos(tookTimeNanos);
BytesStreamOutput out = new BytesStreamOutput();
// it appears to need a boolean and then a ShardSearchContextId written to the stream before the QSR in order to deserialize?
out.writeBoolean(false);
id.writeTo(out);

result.writeToNoId(out);
return out.bytes();
}
}
Loading