-
Notifications
You must be signed in to change notification settings - Fork 28
feat: support partitioned queries through SQL statements #714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -161,6 +161,49 @@ var propertyReadOnlyStaleness = createConnectionProperty( | |
| connectionstate.ConvertReadOnlyStaleness, | ||
| ) | ||
|
|
||
| var propertyAutoPartitionMode = createConnectionProperty( | ||
| "auto_partition_mode", | ||
| "Execute all queries on this connection as partitioned queries. "+ | ||
| "Executing a query that cannot be partitioned will fail. "+ | ||
| "Executing a query in a read/write transaction will also fail.", | ||
| false, | ||
| false, | ||
| nil, | ||
| connectionstate.ContextUser, | ||
| connectionstate.ConvertBool, | ||
| ) | ||
| var propertyDataBoostEnabled = createConnectionProperty( | ||
| "data_boost_enabled", | ||
| "Enable data boost for all partitioned queries that are executed by this connection. "+ | ||
| "This setting is only used for partitioned queries and is ignored by all other statements. "+ | ||
| "Either set `auto_partition_query=true` or execute a query with `RUN PARTITIONED QUERY SELECT ... FROM ...` "+ | ||
| "to execute a query as a partitioned query.", | ||
| false, | ||
| false, | ||
| nil, | ||
| connectionstate.ContextUser, | ||
| connectionstate.ConvertBool, | ||
| ) | ||
| var propertyMaxPartitions = createConnectionProperty( | ||
| "max_partitions", | ||
| "The max partitions hint value to use for partitioned queries. "+ | ||
| "Set to 0 if you do not want to specify a hint.", | ||
| 0, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| false, | ||
| nil, | ||
| connectionstate.ContextUser, | ||
| connectionstate.ConvertInt64, | ||
| ) | ||
| var propertyMaxPartitionedParallelism = createConnectionProperty( | ||
| "max_partitioned_parallelism", | ||
| "The maximum number of workers to use to read data from partitioned queries.", | ||
| 0, | ||
| false, | ||
| nil, | ||
| connectionstate.ContextUser, | ||
| connectionstate.ConvertInt, | ||
| ) | ||
|
|
||
| var propertyAutoBatchDml = createConnectionProperty( | ||
| "auto_batch_dml", | ||
| "Automatically buffer DML statements that are executed on this connection and execute them as one batch "+ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,7 +43,11 @@ func parseStatement(parser *StatementParser, keyword, query string) (ParsedState | |
| stmt = &ParsedStartBatchStatement{} | ||
| } | ||
| } else if isRunStatementKeyword(keyword) { | ||
| stmt = &ParsedRunBatchStatement{} | ||
| if isRunBatch(parser, query) { | ||
| stmt = &ParsedRunBatchStatement{} | ||
| } else if isRunPartitionedQuery(parser, query) { | ||
| stmt = &ParsedRunPartitionedQueryStatement{} | ||
| } | ||
| } else if isAbortStatementKeyword(keyword) { | ||
| stmt = &ParsedAbortBatchStatement{} | ||
| } else if isBeginStatementKeyword(keyword) { | ||
|
|
@@ -55,6 +59,9 @@ func parseStatement(parser *StatementParser, keyword, query string) (ParsedState | |
| } else { | ||
| return nil, nil | ||
| } | ||
| if stmt == nil { | ||
| return nil, nil | ||
| } | ||
| if err := stmt.parse(parser, query); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
@@ -98,6 +105,36 @@ func isStartTransaction(parser *StatementParser, query string) bool { | |
| return false | ||
| } | ||
|
|
||
| func isRunBatch(parser *StatementParser, query string) bool { | ||
| sp := &simpleParser{sql: []byte(query), statementParser: parser} | ||
| if !sp.eatKeyword("run") { | ||
| return false | ||
| } | ||
| if !sp.hasMoreTokens() { | ||
| // START is a synonym for START TRANSACTION | ||
| return false | ||
| } | ||
| if sp.eatKeyword("batch") { | ||
| return true | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| func isRunPartitionedQuery(parser *StatementParser, query string) bool { | ||
| sp := &simpleParser{sql: []byte(query), statementParser: parser} | ||
| if !sp.eatKeyword("run") { | ||
| return false | ||
| } | ||
| if !sp.hasMoreTokens() { | ||
| // START is a synonym for START TRANSACTION | ||
| return false | ||
| } | ||
| if sp.eatKeyword("partitioned") { | ||
| return true | ||
| } | ||
| return false | ||
| } | ||
|
Comment on lines
+108
to
+136
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The functions Additionally, the comment func isRunCommand(parser *StatementParser, query string, command string) bool {
sp := &simpleParser{sql: []byte(query), statementParser: parser}
if !sp.eatKeyword("run") {
return false
}
if !sp.hasMoreTokens() {
return false
}
return sp.eatKeyword(command)
}
func isRunBatch(parser *StatementParser, query string) bool {
return isRunCommand(parser, query, "batch")
}
func isRunPartitionedQuery(parser *StatementParser, query string) bool {
return isRunCommand(parser, query, "partitioned")
} |
||
|
|
||
| // ParsedShowStatement is a statement of the form | ||
| // SHOW [VARIABLE] [my_extension.]my_property | ||
| type ParsedShowStatement struct { | ||
|
|
@@ -509,6 +546,34 @@ func (s *ParsedAbortBatchStatement) parse(parser *StatementParser, query string) | |
| return nil | ||
| } | ||
|
|
||
| type ParsedRunPartitionedQueryStatement struct { | ||
| query string | ||
| Statement string | ||
| } | ||
|
|
||
| func (s *ParsedRunPartitionedQueryStatement) Name() string { | ||
| return "RUN PARTITIONED QUERY" | ||
| } | ||
|
|
||
| func (s *ParsedRunPartitionedQueryStatement) Query() string { | ||
| return s.query | ||
| } | ||
|
|
||
| func (s *ParsedRunPartitionedQueryStatement) parse(parser *StatementParser, query string) error { | ||
| // Parse a statement of the form | ||
| // RUN PARTITIONED QUERY <sql> | ||
| sp := &simpleParser{sql: []byte(query), statementParser: parser} | ||
| if !sp.eatKeywords([]string{"RUN", "PARTITIONED", "QUERY"}) { | ||
| return status.Error(codes.InvalidArgument, "statement does not start with RUN PARTITIONED QUERY") | ||
| } | ||
| if !sp.hasMoreTokens() { | ||
| return status.Errorf(codes.InvalidArgument, "missing statement after RUN PARTITIONED QUERY: %q", sp.sql) | ||
| } | ||
| s.Statement = query[sp.pos:] | ||
| s.query = query | ||
| return nil | ||
| } | ||
|
|
||
| type ParsedBeginStatement struct { | ||
| query string | ||
| // Identifiers contains the transaction properties that were included in the BEGIN statement. E.g. the statement | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -629,3 +629,86 @@ | |||||
| }) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| func TestParseRunPartitionedQuery(t *testing.T) { | ||||||
| t.Parallel() | ||||||
|
|
||||||
| type test struct { | ||||||
| input string | ||||||
| want ParsedRunPartitionedQueryStatement | ||||||
| wantErr bool | ||||||
| } | ||||||
| tests := []test{ | ||||||
| { | ||||||
| input: "run partitioned query select * from my_table", | ||||||
| want: ParsedRunPartitionedQueryStatement{ | ||||||
| statement: " select * from my_table", | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The field
Suggested change
|
||||||
| query: "run partitioned query select * from my_table", | ||||||
| }, | ||||||
| }, | ||||||
| { | ||||||
| input: "run partitioned query\nselect * from my_table", | ||||||
| want: ParsedRunPartitionedQueryStatement{ | ||||||
| statement: "\nselect * from my_table", | ||||||
| query: "run partitioned query\nselect * from my_table", | ||||||
| }, | ||||||
| }, | ||||||
| { | ||||||
| input: "run partitioned query\n--comment\nselect * from my_table", | ||||||
| want: ParsedRunPartitionedQueryStatement{ | ||||||
| statement: "\n--comment\nselect * from my_table", | ||||||
| query: "run partitioned query\n--comment\nselect * from my_table", | ||||||
| }, | ||||||
| }, | ||||||
| { | ||||||
| input: "run --comment\n partitioned /* comment */ query select * from my_table", | ||||||
| want: ParsedRunPartitionedQueryStatement{ | ||||||
| statement: " select * from my_table", | ||||||
| query: "run --comment\n partitioned /* comment */ query select * from my_table", | ||||||
| }, | ||||||
| }, | ||||||
| { | ||||||
| input: "run partitioned query", | ||||||
| wantErr: true, | ||||||
| }, | ||||||
| { | ||||||
| input: "run partitioned query /* comment */", | ||||||
| wantErr: true, | ||||||
| }, | ||||||
| { | ||||||
| input: "run partitioned query -- comment\n", | ||||||
| wantErr: true, | ||||||
| }, | ||||||
| { | ||||||
| input: "run partitioned select * from my_table", | ||||||
| wantErr: true, | ||||||
| }, | ||||||
| } | ||||||
| parser, err := NewStatementParser(databasepb.DatabaseDialect_GOOGLE_STANDARD_SQL, 1000) | ||||||
| if err != nil { | ||||||
| t.Fatal(err) | ||||||
| } | ||||||
| for _, test := range tests { | ||||||
| t.Run(test.input, func(t *testing.T) { | ||||||
| sp := &simpleParser{sql: []byte(test.input), statementParser: parser} | ||||||
| keyword := strings.ToUpper(sp.readKeyword()) | ||||||
| stmt, err := parseStatement(parser, keyword, test.input) | ||||||
| if test.wantErr { | ||||||
| if err == nil { | ||||||
| t.Fatalf("parseStatement(%q) should have failed", test.input) | ||||||
| } | ||||||
| } else { | ||||||
| if err != nil { | ||||||
| t.Fatal(err) | ||||||
| } | ||||||
| runStmt, ok := stmt.(*ParsedRunPartitionedQueryStatement) | ||||||
| if !ok { | ||||||
| t.Fatalf("parseStatement(%q) should have returned a *parsedRunPartitionedQueryStatement", test.input) | ||||||
| } | ||||||
| if !reflect.DeepEqual(*runStmt, test.want) { | ||||||
| t.Errorf("parseStatement(%q) mismatch\n Got: %v\nWant: %v", test.input, *runStmt, test.want) | ||||||
| } | ||||||
| } | ||||||
| }) | ||||||
| } | ||||||
| } | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a typo in the description. The connection property is
auto_partition_mode, but the description refers toauto_partition_query.