diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java b/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java new file mode 100644 index 00000000000..ab0f196ce33 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/PointInTimeLeakIT.java @@ -0,0 +1,173 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.legacy; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +import java.io.IOException; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; +import org.opensearch.sql.legacy.utils.StringUtils; + +/** + * Integration test verifying PIT contexts are created only when needed and properly cleaned up. + * + * @see Issue #5002 + */ +public class PointInTimeLeakIT extends SQLIntegTestCase { + + private static final String TEST_INDEX = "test-logs-2025.01.01"; + private static final String PIT_STATS_ENDPOINT = + "/_nodes/stats/indices/search?filter_path=nodes.*.indices.search.point_in_time_current"; + + @Before + public void setUpTestIndex() throws IOException { + try { + executeRequest(new Request("DELETE", "/" + TEST_INDEX)); + } catch (ResponseException e) { + if (e.getResponse().getStatusLine().getStatusCode() != 404) { + throw e; + } + } + + Request createIndex = new Request("PUT", "/" + TEST_INDEX); + createIndex.setJsonEntity( + "{ \"mappings\": { \"properties\": { \"action\": {\"type\": \"text\", \"fields\":" + + " {\"keyword\": {\"type\": \"keyword\"}}}, \"timestamp\": {\"type\": \"date\"} " + + " } }}"); + executeRequest(createIndex); + + Request bulkRequest = new Request("POST", "/" + TEST_INDEX + "/_bulk"); + bulkRequest.addParameter("refresh", "true"); + bulkRequest.setJsonEntity( + "{\"index\":{}}\n" + + "{\"action\":\"login_success\",\"timestamp\":\"2025-01-01T10:00:00Z\"}\n" + + "{\"index\":{}}\n" + + "{\"action\":\"login_success\",\"timestamp\":\"2025-01-01T10:01:00Z\"}\n" + + "{\"index\":{}}\n" + + "{\"action\":\"login_failed\",\"timestamp\":\"2025-01-01T10:02:00Z\"}\n"); + executeRequest(bulkRequest); + } + + @Test + public void testNoPitLeakWithoutFetchSize() throws IOException, InterruptedException { + int baselinePitCount = getCurrentPitCount(); + + int numQueries = 10; + + for (int i = 0; i < numQueries; i++) { + String query = + StringUtils.format( + "SELECT * FROM %s WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX); + + JSONObject response = executeQueryWithoutFetchSize(query); + + assertTrue("Query should succeed", response.has("datarows")); + JSONArray dataRows = response.getJSONArray("datarows"); + assertThat("Should return results", dataRows.length(), greaterThan(0)); + assertFalse("Should not have cursor for non-paginated query", response.has("cursor")); + } + + int currentPitCount = getCurrentPitCount(); + int leakedPits = currentPitCount - baselinePitCount; + + assertThat("No PITs should leak after fix", leakedPits, equalTo(0)); + } + + @Test + public void testPitManagedProperlyWithFetchSize() throws IOException { + int baselinePitCount = getCurrentPitCount(); + + String query = + StringUtils.format( + "SELECT * FROM %s WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX); + + JSONObject response = executeQueryWithFetchSize(query, 2); + + assertTrue("Should have cursor with fetch_size", response.has("cursor")); + String cursor = response.getString("cursor"); + + JSONObject closeResponse = executeCursorCloseQuery(cursor); + assertTrue("Cursor close should succeed", closeResponse.getBoolean("succeeded")); + + int finalPitCount = getCurrentPitCount(); + + assertThat( + "PIT should be cleaned up after cursor close", finalPitCount, equalTo(baselinePitCount)); + } + + @Test + public void testCompareV1AndV2EnginePitBehavior() throws IOException { + int baselinePitCount = getCurrentPitCount(); + + String v1Query = + StringUtils.format( + "SELECT * FROM %s WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX); + + JSONObject v1Response = executeQueryWithoutFetchSize(v1Query); + int afterV1PitCount = getCurrentPitCount(); + int v1Leaked = afterV1PitCount - baselinePitCount; + + String v2Query = + StringUtils.format( + "SELECT * FROM `%s` WHERE action LIKE 'login%%' ORDER BY timestamp ASC", TEST_INDEX); + + JSONObject v2Response = executeQueryWithoutFetchSize(v2Query); + int afterV2PitCount = getCurrentPitCount(); + int v2Leaked = afterV2PitCount - afterV1PitCount; + + assertTrue("V1 should return results", v1Response.has("datarows")); + assertTrue("V2 should return results", v2Response.has("datarows")); + + assertThat("V1 Legacy SQL should not leak PITs", v1Leaked, equalTo(0)); + assertThat("V2 SQL should not leak PITs", v2Leaked, equalTo(0)); + } + + private JSONObject executeQueryWithoutFetchSize(String query) throws IOException { + Request sqlRequest = new Request("POST", "/_plugins/_sql?format=jdbc"); + sqlRequest.setJsonEntity(String.format("{\"query\": \"%s\"}", query)); + + Response response = client().performRequest(sqlRequest); + return new JSONObject(TestUtils.getResponseBody(response)); + } + + private JSONObject executeQueryWithFetchSize(String query, int fetchSize) throws IOException { + Request sqlRequest = new Request("POST", "/_plugins/_sql?format=jdbc"); + sqlRequest.setJsonEntity( + String.format("{\"query\": \"%s\", \"fetch_size\": %d}", query, fetchSize)); + + Response response = client().performRequest(sqlRequest); + return new JSONObject(TestUtils.getResponseBody(response)); + } + + private int getCurrentPitCount() throws IOException { + Request statsRequest = new Request("GET", PIT_STATS_ENDPOINT); + Response response = client().performRequest(statsRequest); + JSONObject stats = new JSONObject(TestUtils.getResponseBody(response)); + + if (!stats.has("nodes")) { + return 0; + } + + int totalPits = 0; + JSONObject nodes = stats.getJSONObject("nodes"); + for (String nodeId : nodes.keySet()) { + Object pitValue = + stats.optQuery("/nodes/" + nodeId + "/indices/search/point_in_time_current"); + if (pitValue instanceof Number) { + totalPits += ((Number) pitValue).intValue(); + } + } + + return totalPits; + } +} diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java index 9f613f68c33..8ef4b1396a0 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java @@ -5,6 +5,7 @@ package org.opensearch.sql.legacy.executor.format; +import java.util.Locale; import java.util.Map; import java.util.Objects; import org.apache.logging.log4j.LogManager; @@ -21,11 +22,12 @@ import org.opensearch.sql.legacy.exception.SqlParseException; import org.opensearch.sql.legacy.executor.QueryActionElasticExecutor; import org.opensearch.sql.legacy.executor.RestExecutor; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.legacy.pit.PointInTimeHandler; import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl; import org.opensearch.sql.legacy.query.DefaultQueryAction; import org.opensearch.sql.legacy.query.QueryAction; -import org.opensearch.sql.legacy.query.SqlOpenSearchRequestBuilder; import org.opensearch.sql.legacy.query.join.BackOffRetryStrategy; import org.opensearch.transport.client.Client; @@ -36,7 +38,7 @@ public class PrettyFormatRestExecutor implements RestExecutor { private final String format; public PrettyFormatRestExecutor(String format) { - this.format = format.toLowerCase(); + this.format = Objects.requireNonNull(format, "Format cannot be null").toLowerCase(Locale.ROOT); } /** Execute the QueryAction and return the REST response using the channel. */ @@ -72,15 +74,14 @@ public String execute(Client client, Map params, QueryAction que Object queryResult = QueryActionElasticExecutor.executeAnyAction(client, queryAction); protocol = new Protocol(client, queryAction, queryResult, format, Cursor.NULL_CURSOR); } + } catch (SqlParseException e) { + LOG.warn("SQL parsing error: {}", e.getMessage(), e); + protocol = new Protocol(e); + } catch (OpenSearchException e) { + LOG.warn("An error occurred in OpenSearch engine: {}", e.getDetailedMessage(), e); + protocol = new Protocol(e); } catch (Exception e) { - if (e instanceof OpenSearchException) { - LOG.warn( - "An error occurred in OpenSearch engine: " - + ((OpenSearchException) e).getDetailedMessage(), - e); - } else { - LOG.warn("Error happened in pretty formatter", e); - } + LOG.warn("Error happened in pretty formatter", e); protocol = new Protocol(e); } @@ -88,45 +89,83 @@ public String execute(Client client, Map params, QueryAction que } /** - * QueryActionElasticExecutor.executeAnyAction() returns SearchHits inside SearchResponse. In - * order to get scroll ID if any, we need to execute DefaultQueryAction ourselves for - * SearchResponse. + * Builds protocol for default query execution. + * + *

Routes to pagination or non-pagination execution based on fetch_size parameter. */ private Protocol buildProtocolForDefaultQuery(Client client, DefaultQueryAction queryAction) throws SqlParseException { - PointInTimeHandler pit = null; - SearchResponse response; - SqlOpenSearchRequestBuilder sqlOpenSearchRequestBuilder = queryAction.explain(); + queryAction.explain(); - pit = new PointInTimeHandlerImpl(client, queryAction.getSelect().getIndexArr()); + Integer fetchSize = queryAction.getSqlRequest().fetchSize(); + if (fetchSize != null && fetchSize > 0) { + return buildProtocolWithPagination(client, queryAction, fetchSize); + } else { + return buildProtocolWithoutPagination(client, queryAction); + } + } + + /** Executes query with pagination support using Point-in-Time (PIT). */ + private Protocol buildProtocolWithPagination( + Client client, DefaultQueryAction queryAction, Integer fetchSize) { + + PointInTimeHandler pit = + new PointInTimeHandlerImpl(client, queryAction.getSelect().getIndexArr()); pit.create(); + + try { + SearchRequestBuilder searchRequest = queryAction.getRequestBuilder(); + searchRequest.setPointInTime(new PointInTimeBuilder(pit.getPitId())); + SearchResponse response = searchRequest.get(); + + if (shouldCreateCursor(response, queryAction, fetchSize)) { + DefaultCursor cursor = createCursorWithPit(pit, response, queryAction, fetchSize); + return new Protocol(client, queryAction, response.getHits(), format, cursor); + } else { + pit.delete(); + return new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR); + } + } catch (RuntimeException e) { + try { + pit.delete(); + } catch (RuntimeException deleteException) { + LOG.error("Failed to delete PIT", deleteException); + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + } + throw e; + } + } + + private Protocol buildProtocolWithoutPagination(Client client, DefaultQueryAction queryAction) { SearchRequestBuilder searchRequest = queryAction.getRequestBuilder(); - searchRequest.setPointInTime(new PointInTimeBuilder(pit.getPitId())); - response = searchRequest.get(); + SearchResponse response = searchRequest.get(); + return new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR); + } - Protocol protocol; - if (isDefaultCursor(response, queryAction)) { - DefaultCursor defaultCursor = new DefaultCursor(); - defaultCursor.setLimit(queryAction.getSelect().getRowCount()); - defaultCursor.setFetchSize(queryAction.getSqlRequest().fetchSize()); - - defaultCursor.setPitId(pit.getPitId()); - defaultCursor.setSearchSourceBuilder(queryAction.getRequestBuilder().request().source()); - defaultCursor.setSortFields( + private DefaultCursor createCursorWithPit( + PointInTimeHandler pit, + SearchResponse response, + DefaultQueryAction queryAction, + Integer fetchSize) { + DefaultCursor cursor = new DefaultCursor(); + cursor.setLimit(queryAction.getSelect().getRowCount()); + cursor.setFetchSize(fetchSize); + cursor.setPitId(pit.getPitId()); + cursor.setSearchSourceBuilder(queryAction.getRequestBuilder().request().source()); + + if (response.getHits().getHits().length > 0) { + cursor.setSortFields( response.getHits().getAt(response.getHits().getHits().length - 1).getSortValues()); - - protocol = new Protocol(client, queryAction, response.getHits(), format, defaultCursor); - } else { - protocol = new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR); } - return protocol; + return cursor; } - protected boolean isDefaultCursor(SearchResponse searchResponse, DefaultQueryAction queryAction) { - return queryAction.getSqlRequest().fetchSize() != 0 - && Objects.requireNonNull(searchResponse.getHits().getTotalHits()).value() - >= queryAction.getSqlRequest().fetchSize(); + protected boolean shouldCreateCursor( + SearchResponse searchResponse, DefaultQueryAction queryAction, Integer fetchSize) { + return fetchSize != null + && searchResponse.getHits() != null + && Objects.requireNonNull(searchResponse.getHits().getTotalHits()).value() >= fetchSize; } } diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorPitTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorPitTest.java new file mode 100644 index 00000000000..ac9178ec5d5 --- /dev/null +++ b/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorPitTest.java @@ -0,0 +1,127 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.legacy.executor.format; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import org.apache.lucene.search.TotalHits; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.opensearch.action.search.SearchRequestBuilder; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.search.builder.PointInTimeBuilder; +import org.opensearch.sql.legacy.esdomain.LocalClusterState; +import org.opensearch.sql.legacy.query.DefaultQueryAction; +import org.opensearch.sql.legacy.query.SqlOpenSearchRequestBuilder; +import org.opensearch.sql.legacy.request.SqlRequest; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; +import org.opensearch.transport.client.Client; + +/** Unit tests for PIT lifecycle management in PrettyFormatRestExecutor. */ +@RunWith(MockitoJUnitRunner.Silent.class) +public class PrettyFormatRestExecutorPitTest { + + @Mock private Client client; + @Mock private DefaultQueryAction queryAction; + @Mock private SqlRequest sqlRequest; + @Mock private SqlOpenSearchRequestBuilder requestBuilder; + @Mock private SearchRequestBuilder searchRequestBuilder; + @Mock private SearchResponse searchResponse; + @Mock private SearchHit searchHit; + + private PrettyFormatRestExecutor executor; + private Map params; + + @Before + public void setUp() throws Exception { + OpenSearchSettings settings = mock(OpenSearchSettings.class); + LocalClusterState.state().setPluginSettings(settings); + + when(queryAction.getSqlRequest()).thenReturn(sqlRequest); + when(queryAction.explain()).thenReturn(requestBuilder); + when(queryAction.getRequestBuilder()).thenReturn(searchRequestBuilder); + when(searchRequestBuilder.get()).thenReturn(searchResponse); + + SearchHits hits = + new SearchHits( + new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F); + when(searchResponse.getHits()).thenReturn(hits); + + executor = new PrettyFormatRestExecutor("jdbc"); + params = new HashMap<>(); + } + + @Test + public void testNoPitCreatedWhenFetchSizeIsZero() throws Exception { + when(sqlRequest.fetchSize()).thenReturn(0); + + executor.execute(client, params, queryAction); + + verify(searchRequestBuilder, never()).setPointInTime(any(PointInTimeBuilder.class)); + verify(searchRequestBuilder, times(1)).get(); + } + + @Test + public void testNoPitCreatedWhenFetchSizeNotSpecified() throws Exception { + when(sqlRequest.fetchSize()).thenReturn(null); // Simulates fetch_size not in request + + executor.execute(client, params, queryAction); + + verify(searchRequestBuilder, never()).setPointInTime(any(PointInTimeBuilder.class)); + } + + @Test + public void testFetchSizeSpecifiedButResultsFitInOnePage() throws Exception { + when(sqlRequest.fetchSize()).thenReturn(100); + + SearchHits hits = + new SearchHits( + new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F); + when(searchResponse.getHits()).thenReturn(hits); + + String result = executor.execute(client, params, queryAction); + + assertNotNull("Query should execute successfully", result); + } + + @Test + public void testCursorCreatedWhenResultsExceedFetchSize() throws Exception { + when(sqlRequest.fetchSize()).thenReturn(5); + + SearchHits hits = + new SearchHits( + new SearchHit[] {searchHit}, new TotalHits(10, TotalHits.Relation.EQUAL_TO), 1.0F); + when(searchResponse.getHits()).thenReturn(hits); + + String result = executor.execute(client, params, queryAction); + + assertNotNull("Query should execute successfully", result); + } + + @Test + public void testQueryExecutionSucceedsWithVariousFetchSizes() throws Exception { + int[] fetchSizes = {0, 100, 1}; + + for (int fetchSize : fetchSizes) { + when(sqlRequest.fetchSize()).thenReturn(fetchSize); + String result = executor.execute(client, params, queryAction); + assertNotNull("Result should not be null for fetchSize=" + fetchSize, result); + } + } +} diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorTest.java index de51bd448e6..d51aa220c9c 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorTest.java @@ -21,52 +21,50 @@ import org.opensearch.search.SearchHits; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.query.DefaultQueryAction; -import org.opensearch.sql.legacy.request.SqlRequest; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; +/** + * Unit tests for shouldCreateCursor() method. + * + *

For PIT lifecycle tests, see {@link PrettyFormatRestExecutorPitTest}. + */ @RunWith(MockitoJUnitRunner.class) public class PrettyFormatRestExecutorTest { @Mock private SearchResponse searchResponse; @Mock private SearchHit searchHit; @Mock private DefaultQueryAction queryAction; - @Mock private SqlRequest sqlRequest; private PrettyFormatRestExecutor executor; @Before public void setUp() { OpenSearchSettings settings = mock(OpenSearchSettings.class); LocalClusterState.state().setPluginSettings(settings); - when(queryAction.getSqlRequest()).thenReturn(sqlRequest); executor = new PrettyFormatRestExecutor("jdbc"); } @Test - public void testIsDefaultCursor_fetchSizeZero() { - when(sqlRequest.fetchSize()).thenReturn(0); - - assertFalse(executor.isDefaultCursor(searchResponse, queryAction)); + public void testShouldCreateCursor_fetchSizeZero() { + assertFalse(executor.shouldCreateCursor(searchResponse, queryAction, 0)); } @Test - public void testIsDefaultCursor_totalHitsLessThanFetchSize() { - when(sqlRequest.fetchSize()).thenReturn(10); + public void testShouldCreateCursor_totalHitsLessThanFetchSize() { when(searchResponse.getHits()) .thenReturn( new SearchHits( new SearchHit[] {searchHit}, new TotalHits(5, TotalHits.Relation.EQUAL_TO), 1.0F)); - assertFalse(executor.isDefaultCursor(searchResponse, queryAction)); + assertFalse(executor.shouldCreateCursor(searchResponse, queryAction, 10)); } @Test - public void testIsDefaultCursor_totalHitsGreaterThanOrEqualToFetchSize() { - when(sqlRequest.fetchSize()).thenReturn(5); + public void testShouldCreateCursor_totalHitsGreaterThanOrEqualToFetchSize() { when(searchResponse.getHits()) .thenReturn( new SearchHits( new SearchHit[] {searchHit}, new TotalHits(5, TotalHits.Relation.EQUAL_TO), 1.0F)); - assertTrue(executor.isDefaultCursor(searchResponse, queryAction)); + assertTrue(executor.shouldCreateCursor(searchResponse, queryAction, 5)); } }