Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.legacy;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

import java.io.IOException;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.Before;
import org.junit.Test;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.sql.legacy.utils.StringUtils;

/**
* Integration test verifying PIT contexts are created only when needed and properly cleaned up.
*
* @see <a href="https://github.com/opensearch-project/sql/issues/5002">Issue #5002</a>
*/
public class PointInTimeLeakIT extends SQLIntegTestCase {

private static final String TEST_INDEX = "test-logs-2025.01.01";
private static final String PIT_STATS_ENDPOINT =
"/_nodes/stats/indices/search?filter_path=nodes.*.indices.search.point_in_time_current";

@Before
public void setUpTestIndex() throws IOException {
try {
executeRequest(new Request("DELETE", "/" + TEST_INDEX));
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
throw e;
}
}

Request createIndex = new Request("PUT", "/" + TEST_INDEX);
createIndex.setJsonEntity(
"{ \"mappings\": { \"properties\": { \"action\": {\"type\": \"text\", \"fields\":"
+ " {\"keyword\": {\"type\": \"keyword\"}}}, \"timestamp\": {\"type\": \"date\"} "
+ " } }}");
executeRequest(createIndex);

Request bulkRequest = new Request("POST", "/" + TEST_INDEX + "/_bulk");
bulkRequest.addParameter("refresh", "true");
bulkRequest.setJsonEntity(
"{\"index\":{}}\n"
+ "{\"action\":\"login_success\",\"timestamp\":\"2025-01-01T10:00:00Z\"}\n"
+ "{\"index\":{}}\n"
+ "{\"action\":\"login_success\",\"timestamp\":\"2025-01-01T10:01:00Z\"}\n"
+ "{\"index\":{}}\n"
+ "{\"action\":\"login_failed\",\"timestamp\":\"2025-01-01T10:02:00Z\"}\n");
executeRequest(bulkRequest);
}

@Test
public void testNoPitLeakWithoutFetchSize() throws IOException, InterruptedException {
int baselinePitCount = getCurrentPitCount();

int numQueries = 10;

for (int i = 0; i < numQueries; i++) {
String query =
StringUtils.format(
"SELECT * FROM %s WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX);

JSONObject response = executeQueryWithoutFetchSize(query);

assertTrue("Query should succeed", response.has("datarows"));
JSONArray dataRows = response.getJSONArray("datarows");
assertThat("Should return results", dataRows.length(), greaterThan(0));
assertFalse("Should not have cursor for non-paginated query", response.has("cursor"));
}

int currentPitCount = getCurrentPitCount();
int leakedPits = currentPitCount - baselinePitCount;

assertThat("No PITs should leak after fix", leakedPits, equalTo(0));
}

@Test
public void testPitManagedProperlyWithFetchSize() throws IOException {
int baselinePitCount = getCurrentPitCount();

String query =
StringUtils.format(
"SELECT * FROM %s WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX);

JSONObject response = executeQueryWithFetchSize(query, 2);

assertTrue("Should have cursor with fetch_size", response.has("cursor"));
String cursor = response.getString("cursor");

JSONObject closeResponse = executeCursorCloseQuery(cursor);
assertTrue("Cursor close should succeed", closeResponse.getBoolean("succeeded"));

int finalPitCount = getCurrentPitCount();

assertThat(
"PIT should be cleaned up after cursor close", finalPitCount, equalTo(baselinePitCount));
}

@Test
public void testCompareV1AndV2EnginePitBehavior() throws IOException {
int baselinePitCount = getCurrentPitCount();

String v1Query =
StringUtils.format(
"SELECT * FROM %s WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX);

JSONObject v1Response = executeQueryWithoutFetchSize(v1Query);
int afterV1PitCount = getCurrentPitCount();
int v1Leaked = afterV1PitCount - baselinePitCount;

String v2Query =
StringUtils.format(
"SELECT * FROM `%s` WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX);

JSONObject v2Response = executeQueryWithoutFetchSize(v2Query);
int afterV2PitCount = getCurrentPitCount();
int v2Leaked = afterV2PitCount - afterV1PitCount;

assertTrue("V1 should return results", v1Response.has("datarows"));
assertTrue("V2 should return results", v2Response.has("datarows"));

assertThat("V1 Legacy SQL should not leak PITs", v1Leaked, equalTo(0));
assertThat("V2 SQL should not leak PITs", v2Leaked, equalTo(0));
}

private JSONObject executeQueryWithoutFetchSize(String query) throws IOException {
Request sqlRequest = new Request("POST", "/_plugins/_sql?format=jdbc");
sqlRequest.setJsonEntity(String.format("{\"query\": \"%s\"}", query));

Response response = client().performRequest(sqlRequest);
return new JSONObject(TestUtils.getResponseBody(response));
}

private JSONObject executeQueryWithFetchSize(String query, int fetchSize) throws IOException {
Request sqlRequest = new Request("POST", "/_plugins/_sql?format=jdbc");
sqlRequest.setJsonEntity(
String.format("{\"query\": \"%s\", \"fetch_size\": %d}", query, fetchSize));

Response response = client().performRequest(sqlRequest);
return new JSONObject(TestUtils.getResponseBody(response));
}

private int getCurrentPitCount() throws IOException {
Request statsRequest = new Request("GET", PIT_STATS_ENDPOINT);
Response response = client().performRequest(statsRequest);
JSONObject stats = new JSONObject(TestUtils.getResponseBody(response));

if (!stats.has("nodes")) {
return 0;
}

int totalPits = 0;
JSONObject nodes = stats.getJSONObject("nodes");
for (String nodeId : nodes.keySet()) {
Object pitValue =
stats.optQuery("/nodes/" + nodeId + "/indices/search/point_in_time_current");
if (pitValue instanceof Number) {
totalPits += ((Number) pitValue).intValue();
}
}

return totalPits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.legacy.executor.format;

import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import org.apache.logging.log4j.LogManager;
Expand All @@ -21,11 +22,12 @@
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.executor.QueryActionElasticExecutor;
import org.opensearch.sql.legacy.executor.RestExecutor;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl;
import org.opensearch.sql.legacy.query.DefaultQueryAction;
import org.opensearch.sql.legacy.query.QueryAction;
import org.opensearch.sql.legacy.query.SqlOpenSearchRequestBuilder;
import org.opensearch.sql.legacy.query.join.BackOffRetryStrategy;
import org.opensearch.transport.client.Client;

Expand All @@ -36,7 +38,7 @@ public class PrettyFormatRestExecutor implements RestExecutor {
private final String format;

public PrettyFormatRestExecutor(String format) {
this.format = format.toLowerCase();
this.format = Objects.requireNonNull(format, "Format cannot be null").toLowerCase(Locale.ROOT);
}

/** Execute the QueryAction and return the REST response using the channel. */
Expand Down Expand Up @@ -72,61 +74,98 @@ public String execute(Client client, Map<String, String> params, QueryAction que
Object queryResult = QueryActionElasticExecutor.executeAnyAction(client, queryAction);
protocol = new Protocol(client, queryAction, queryResult, format, Cursor.NULL_CURSOR);
}
} catch (SqlParseException e) {
LOG.warn("SQL parsing error: {}", e.getMessage(), e);
protocol = new Protocol(e);
} catch (OpenSearchException e) {
LOG.warn("An error occurred in OpenSearch engine: {}", e.getDetailedMessage(), e);
protocol = new Protocol(e);
} catch (Exception e) {
if (e instanceof OpenSearchException) {
LOG.warn(
"An error occurred in OpenSearch engine: "
+ ((OpenSearchException) e).getDetailedMessage(),
e);
} else {
LOG.warn("Error happened in pretty formatter", e);
}
LOG.warn("Error happened in pretty formatter", e);
protocol = new Protocol(e);
}

return protocol.format();
}

/**
* QueryActionElasticExecutor.executeAnyAction() returns SearchHits inside SearchResponse. In
* order to get scroll ID if any, we need to execute DefaultQueryAction ourselves for
* SearchResponse.
* Builds protocol for default query execution.
*
* <p>Routes to pagination or non-pagination execution based on fetch_size parameter.
*/
private Protocol buildProtocolForDefaultQuery(Client client, DefaultQueryAction queryAction)
throws SqlParseException {

PointInTimeHandler pit = null;
SearchResponse response;
SqlOpenSearchRequestBuilder sqlOpenSearchRequestBuilder = queryAction.explain();
queryAction.explain();

pit = new PointInTimeHandlerImpl(client, queryAction.getSelect().getIndexArr());
Integer fetchSize = queryAction.getSqlRequest().fetchSize();
if (fetchSize != null && fetchSize > 0) {
return buildProtocolWithPagination(client, queryAction, fetchSize);
} else {
return buildProtocolWithoutPagination(client, queryAction);
}
}

/** Executes query with pagination support using Point-in-Time (PIT). */
private Protocol buildProtocolWithPagination(
Client client, DefaultQueryAction queryAction, Integer fetchSize) {

PointInTimeHandler pit =
new PointInTimeHandlerImpl(client, queryAction.getSelect().getIndexArr());
pit.create();

try {
SearchRequestBuilder searchRequest = queryAction.getRequestBuilder();
searchRequest.setPointInTime(new PointInTimeBuilder(pit.getPitId()));
SearchResponse response = searchRequest.get();

if (shouldCreateCursor(response, queryAction, fetchSize)) {
DefaultCursor cursor = createCursorWithPit(pit, response, queryAction, fetchSize);
return new Protocol(client, queryAction, response.getHits(), format, cursor);
} else {
pit.delete();
return new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR);
}
} catch (RuntimeException e) {
try {
pit.delete();
} catch (RuntimeException deleteException) {
LOG.error("Failed to delete PIT", deleteException);
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
}
throw e;
}
}

private Protocol buildProtocolWithoutPagination(Client client, DefaultQueryAction queryAction) {
SearchRequestBuilder searchRequest = queryAction.getRequestBuilder();
searchRequest.setPointInTime(new PointInTimeBuilder(pit.getPitId()));
response = searchRequest.get();
SearchResponse response = searchRequest.get();
return new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR);
}

Protocol protocol;
if (isDefaultCursor(response, queryAction)) {
DefaultCursor defaultCursor = new DefaultCursor();
defaultCursor.setLimit(queryAction.getSelect().getRowCount());
defaultCursor.setFetchSize(queryAction.getSqlRequest().fetchSize());

defaultCursor.setPitId(pit.getPitId());
defaultCursor.setSearchSourceBuilder(queryAction.getRequestBuilder().request().source());
defaultCursor.setSortFields(
private DefaultCursor createCursorWithPit(
PointInTimeHandler pit,
SearchResponse response,
DefaultQueryAction queryAction,
Integer fetchSize) {
DefaultCursor cursor = new DefaultCursor();
cursor.setLimit(queryAction.getSelect().getRowCount());
cursor.setFetchSize(fetchSize);
cursor.setPitId(pit.getPitId());
cursor.setSearchSourceBuilder(queryAction.getRequestBuilder().request().source());

if (response.getHits().getHits().length > 0) {
cursor.setSortFields(
response.getHits().getAt(response.getHits().getHits().length - 1).getSortValues());

protocol = new Protocol(client, queryAction, response.getHits(), format, defaultCursor);
} else {
protocol = new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR);
}

return protocol;
return cursor;
}

protected boolean isDefaultCursor(SearchResponse searchResponse, DefaultQueryAction queryAction) {
return queryAction.getSqlRequest().fetchSize() != 0
&& Objects.requireNonNull(searchResponse.getHits().getTotalHits()).value()
>= queryAction.getSqlRequest().fetchSize();
protected boolean shouldCreateCursor(
SearchResponse searchResponse, DefaultQueryAction queryAction, Integer fetchSize) {
return fetchSize != null
&& searchResponse.getHits() != null
&& Objects.requireNonNull(searchResponse.getHits().getTotalHits()).value() >= fetchSize;
}
}
Loading
Loading