Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ statement
| CREATE (OR REPLACE)? MATERIALIZED VIEW
(IF NOT EXISTS)? qualifiedName
(GRACE PERIOD interval)?
(WHEN STALE (INLINE | FAIL))?
(COMMENT string)?
(WITH properties)? AS rootQuery #createMaterializedView
| CREATE (OR REPLACE)? VIEW qualifiedName
Expand Down Expand Up @@ -1027,10 +1028,10 @@ nonReserved
| CALL | CALLED | CASCADE | CATALOG | CATALOGS | COLUMN | COLUMNS | COMMENT | COMMIT | COMMITTED | CONDITIONAL | COPARTITION | CORRESPONDING | COUNT | CURRENT
| DATA | DATE | DAY | DECLARE | DEFAULT | DEFINE | DEFINER | DENY | DESC | DESCRIPTOR | DETERMINISTIC | DISTRIBUTED | DO | DOUBLE
| ELSEIF | EMPTY | ENCODING | ERROR | EXCLUDING | EXECUTE | EXPLAIN
| FAST | FETCH | FILTER | FINAL | FIRST | FOLLOWING | FORMAT | FORWARD | FUNCTION | FUNCTIONS
| FAIL | FAST | FETCH | FILTER | FINAL | FIRST | FOLLOWING | FORMAT | FORWARD | FUNCTION | FUNCTIONS
| GRACE | GRANT | GRANTED | GRANTS | GRAPHVIZ | GROUPS
| HOUR
| IF | IGNORE | IMMEDIATE | INCLUDING | INITIAL | INPUT | INTERVAL | INVOKER | IO | ITERATE | ISOLATION
| IF | IGNORE | IMMEDIATE | INCLUDING | INITIAL | INLINE | INPUT | INTERVAL | INVOKER | IO | ITERATE | ISOLATION
| JSON
| KEEP | KEY | KEYS
| LANGUAGE | LAST | LATERAL | LEADING | LEAVE | LEVEL | LIMIT | LOCAL | LOGICAL | LOOP
Expand All @@ -1041,7 +1042,7 @@ nonReserved
| QUOTES
| RANGE | READ | REFRESH | RENAME | REPEAT | REPEATABLE | REPLACE | RESET | RESPECT | RESTRICT | RETURN | RETURNING | RETURNS | REVOKE | ROLE | ROLES | ROLLBACK | ROW | ROWS | RUNNING
| SCALAR | SCHEMA | SCHEMAS | SECOND | SECURITY | SEEK | SERIALIZABLE | SESSION | SET | SETS
| SHOW | SOME | START | STATS | SUBSET | SUBSTRING | SYSTEM
| SHOW | SOME | STALE | START | STATS | SUBSET | SUBSTRING | SYSTEM
| TABLES | TABLESAMPLE | TEXT | TEXT_STRING | TIES | TIME | TIMESTAMP | TO | TRAILING | TRANSACTION | TRUNCATE | TRY_CAST | TYPE
| UNBOUNDED | UNCOMMITTED | UNCONDITIONAL | UNIQUE | UNKNOWN | UNMATCHED | UNTIL | UPDATE | USE | USER | UTF16 | UTF32 | UTF8
| VALIDATE | VALUE | VERBOSE | VERSION | VIEW
Expand Down Expand Up @@ -1133,6 +1134,7 @@ EXECUTE: 'EXECUTE';
EXISTS: 'EXISTS';
EXPLAIN: 'EXPLAIN';
EXTRACT: 'EXTRACT';
FAIL: 'FAIL';
FALSE: 'FALSE';
FAST: 'FAST';
FETCH: 'FETCH';
Expand Down Expand Up @@ -1163,6 +1165,7 @@ IMMEDIATE: 'IMMEDIATE';
IN: 'IN';
INCLUDING: 'INCLUDING';
INITIAL: 'INITIAL';
INLINE: 'INLINE';
INNER: 'INNER';
INPUT: 'INPUT';
INSERT: 'INSERT';
Expand Down Expand Up @@ -1293,6 +1296,7 @@ SET: 'SET';
SETS: 'SETS';
SHOW: 'SHOW';
SOME: 'SOME';
STALE: 'STALE';
START: 'START';
STATS: 'STATS';
SUBSET: 'SUBSET';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void test()
"EXISTS",
"EXPLAIN",
"EXTRACT",
"FAIL",
"FALSE",
"FAST",
"FETCH",
Expand Down Expand Up @@ -139,6 +140,7 @@ public void test()
"IN",
"INCLUDING",
"INITIAL",
"INLINE",
"INNER",
"INPUT",
"INSERT",
Expand Down Expand Up @@ -270,6 +272,7 @@ public void test()
"SHOW",
"SKIP",
"SOME",
"STALE",
"START",
"STATS",
"STRING",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.metadata.ViewColumn;
import io.trino.security.AccessControl;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorMaterializedViewDefinition.WhenStaleBehavior;
import io.trino.spi.type.Type;
import io.trino.sql.PlannerContext;
import io.trino.sql.analyzer.Analysis;
Expand Down Expand Up @@ -55,6 +56,7 @@
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.TYPE_MISMATCH;
import static io.trino.spi.connector.ConnectorCapabilities.MATERIALIZED_VIEW_GRACE_PERIOD;
import static io.trino.spi.connector.ConnectorCapabilities.MATERIALIZED_VIEW_WHEN_STALE_BEHAVIOR;
import static io.trino.sql.SqlFormatterUtil.getFormattedSql;
import static io.trino.sql.analyzer.ConstantEvaluator.evaluateConstant;
import static io.trino.sql.analyzer.SemanticExceptions.semanticException;
Expand Down Expand Up @@ -156,12 +158,21 @@ Analysis executeInternal(
return Duration.ofMillis(milliseconds);
});

Optional<WhenStaleBehavior> whenStale = statement.getWhenStaleBehavior()
.map(whenStaleBehavior -> {
if (!plannerContext.getMetadata().getConnectorCapabilities(session, catalogHandle).contains(MATERIALIZED_VIEW_WHEN_STALE_BEHAVIOR)) {
throw semanticException(NOT_SUPPORTED, statement, "Catalog '%s' does not support WHEN STALE", catalogName);
}
Comment on lines +163 to +165
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should catalogs need to handle anything in order to support it ? If it is some sort of MV property (like security mode in case of views) - It should be supported for all connectors supporting MVs right ? Or we could fail during the creation or alter phase ?

return toConnectorWhenStaleBehavior(whenStaleBehavior);
});

MaterializedViewDefinition definition = new MaterializedViewDefinition(
sql,
session.getCatalog(),
session.getSchema(),
columns,
gracePeriod,
whenStale,
statement.getComment(),
session.getIdentity(),
session.getPath().getPath().stream()
Expand All @@ -182,4 +193,12 @@ Analysis executeInternal(
plannerContext.getMetadata().createMaterializedView(session, name, definition, properties, statement.isReplace(), statement.isNotExists());
return analysis;
}

private static WhenStaleBehavior toConnectorWhenStaleBehavior(CreateMaterializedView.WhenStaleBehavior whenStale)
{
return switch (whenStale) {
case INLINE -> WhenStaleBehavior.INLINE;
case FAIL -> WhenStaleBehavior.FAIL;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.trino.spi.connector.CatalogSchemaName;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
import io.trino.spi.connector.ConnectorMaterializedViewDefinition.WhenStaleBehavior;
import io.trino.spi.security.Identity;

import java.time.Duration;
Expand All @@ -31,6 +32,7 @@ public class MaterializedViewDefinition
extends ViewDefinition
{
private final Optional<Duration> gracePeriod;
private final Optional<WhenStaleBehavior> whenStaleBehavior;
private final Optional<CatalogSchemaTableName> storageTable;

public MaterializedViewDefinition(
Expand All @@ -39,6 +41,7 @@ public MaterializedViewDefinition(
Optional<String> schema,
List<ViewColumn> columns,
Optional<Duration> gracePeriod,
Optional<WhenStaleBehavior> whenStaleBehavior,
Optional<String> comment,
Identity owner,
List<CatalogSchemaName> path,
Expand All @@ -47,6 +50,7 @@ public MaterializedViewDefinition(
super(originalSql, catalog, schema, columns, comment, Optional.of(owner), path);
this.gracePeriod = requireNonNull(gracePeriod, "gracePeriod is null");
checkArgument(gracePeriod.isEmpty() || !gracePeriod.get().isNegative(), "gracePeriod cannot be negative: %s", gracePeriod);
this.whenStaleBehavior = requireNonNull(whenStaleBehavior, "whenStaleBehavior is null");
this.storageTable = requireNonNull(storageTable, "storageTable is null");
}

Expand All @@ -55,6 +59,11 @@ public Optional<Duration> getGracePeriod()
return gracePeriod;
}

public Optional<WhenStaleBehavior> getWhenStaleBehavior()
{
return whenStaleBehavior;
}

public Optional<CatalogSchemaTableName> getStorageTable()
{
return storageTable;
Expand All @@ -71,6 +80,7 @@ public ConnectorMaterializedViewDefinition toConnectorMaterializedViewDefinition
.map(column -> new ConnectorMaterializedViewDefinition.Column(column.name(), column.type(), column.comment()))
.collect(toImmutableList()),
getGracePeriod(),
whenStaleBehavior,
getComment(),
getRunAsIdentity().map(Identity::getUser),
getPath());
Expand All @@ -85,6 +95,7 @@ public String toString()
.add("schema", getSchema().orElse(null))
.add("columns", getColumns())
.add("gracePeriod", gracePeriod.orElse(null))
.add("whenStaleBehavior", whenStaleBehavior.orElse(null))
.add("comment", getComment().orElse(null))
.add("runAsIdentity", getRunAsIdentity())
.add("path", getPath())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,7 @@ private static MaterializedViewDefinition createMaterializedViewDefinition(Conne
.map(column -> new ViewColumn(column.getName(), column.getType(), Optional.empty()))
.collect(toImmutableList()),
view.getGracePeriod(),
view.getWhenStaleBehavior(),
view.getComment(),
runAsIdentity,
view.getPath(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ColumnSchema;
import io.trino.spi.connector.ConnectorMaterializedViewDefinition.WhenStaleBehavior;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.MaterializedViewFreshness;
Expand Down Expand Up @@ -2285,6 +2286,7 @@ protected Scope visitTable(Table table, Optional<Scope> scope)
if (optionalMaterializedView.isPresent()) {
MaterializedViewDefinition materializedViewDefinition = optionalMaterializedView.get();
analysis.addEmptyColumnReferencesForTable(accessControl, session.getIdentity(), name);
boolean useLogicalViewSemantics = shouldUseLogicalViewSemantics(materializedViewDefinition);
if (isMaterializedViewSufficientlyFresh(session, name, materializedViewDefinition)) {
// If materialized view is sufficiently fresh with respect to its grace period, answer the query using the storage table
QualifiedName storageName = getMaterializedViewStorageTableName(materializedViewDefinition)
Expand All @@ -2293,10 +2295,13 @@ protected Scope visitTable(Table table, Optional<Scope> scope)
checkStorageTableNotRedirected(storageTableName);
TableHandle tableHandle = metadata.getTableHandle(session, storageTableName)
.orElseThrow(() -> semanticException(INVALID_VIEW, table, "Storage table '%s' does not exist", storageTableName));
return createScopeForMaterializedView(table, name, scope, materializedViewDefinition, Optional.of(tableHandle));
return createScopeForMaterializedView(table, name, scope, materializedViewDefinition, Optional.of(tableHandle), useLogicalViewSemantics);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned about this line: the new clause affects the path for fresh MVs (WHEN STALE FAIL skips analysis, while WHEN STALE INLINE runs the analysis), even though the syntax suggests it should only define behavior for stale MVs.

Is analysis of the underlying query even necessary for fresh MVs, regardless of the WHEN STALE clause? I understand that it can detect schema changes in the base tables or changes to the MV definer’s access permissions - but is that the intended behavior?

}
else if (!useLogicalViewSemantics) {
throw semanticException(VIEW_IS_STALE, table, "Materialized view '%s' is stale", name);
}
// This is a stale materialized view and should be expanded like a logical view
return createScopeForMaterializedView(table, name, scope, materializedViewDefinition, Optional.empty());
return createScopeForMaterializedView(table, name, scope, materializedViewDefinition, Optional.empty(), useLogicalViewSemantics);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of passing a useLogicalViewSemantics can we create a scope for the storage table ?

}

// This could be a reference to a logical view or a table
Expand Down Expand Up @@ -2385,6 +2390,15 @@ private boolean isMaterializedViewSufficientlyFresh(Session session, QualifiedOb
return staleness.compareTo(gracePeriod) <= 0;
}

private static boolean shouldUseLogicalViewSemantics(MaterializedViewDefinition materializedViewDefinition)
{
WhenStaleBehavior whenStale = materializedViewDefinition.getWhenStaleBehavior().orElse(WhenStaleBehavior.INLINE);
return switch (whenStale) {
case WhenStaleBehavior.INLINE -> true;
case WhenStaleBehavior.FAIL -> false;
};
}

private void checkStorageTableNotRedirected(QualifiedObjectName source)
{
metadata.getRedirectionAwareTableHandle(session, source).redirectedTableName().ifPresent(name -> {
Expand Down Expand Up @@ -2529,7 +2543,13 @@ private Scope createScopeForCommonTableExpression(Table table, Optional<Scope> s
return createAndAssignScope(table, scope, fields);
}

private Scope createScopeForMaterializedView(Table table, QualifiedObjectName name, Optional<Scope> scope, MaterializedViewDefinition view, Optional<TableHandle> storageTable)
private Scope createScopeForMaterializedView(
Table table,
QualifiedObjectName name,
Optional<Scope> scope,
MaterializedViewDefinition view,
Optional<TableHandle> storageTable,
boolean useLogicalViewSemantics)
{
return createScopeForView(
table,
Expand All @@ -2542,7 +2562,8 @@ private Scope createScopeForMaterializedView(Table table, QualifiedObjectName na
view.getPath(),
view.getColumns(),
storageTable,
true);
true,
useLogicalViewSemantics);
}

private Scope createScopeForView(Table table, QualifiedObjectName name, Optional<Scope> scope, ViewDefinition view)
Expand All @@ -2557,7 +2578,8 @@ private Scope createScopeForView(Table table, QualifiedObjectName name, Optional
view.getPath(),
view.getColumns(),
Optional.empty(),
false);
false,
true);
}

private Scope createScopeForView(
Expand All @@ -2571,7 +2593,8 @@ private Scope createScopeForView(
List<CatalogSchemaName> path,
List<ViewColumn> columns,
Optional<TableHandle> storageTable,
boolean isMaterializedView)
boolean isMaterializedView,
boolean useLogicalViewSemantics)
{
Statement statement = analysis.getStatement();
if (statement instanceof CreateView viewStatement) {
Expand All @@ -2590,18 +2613,27 @@ private Scope createScopeForView(
throw semanticException(VIEW_IS_RECURSIVE, table, "View is recursive");
}

Query query = parseView(originalSql, name, table);
if (useLogicalViewSemantics) {
Query query = parseView(originalSql, name, table);

if (!query.getFunctions().isEmpty()) {
throw semanticException(NOT_SUPPORTED, table, "View contains inline function: %s", name);
}
if (!query.getFunctions().isEmpty()) {
throw semanticException(NOT_SUPPORTED, table, "View contains inline function: %s", name);
}

analysis.registerTableForView(table, name, isMaterializedView);
RelationType descriptor = analyzeView(query, name, catalog, schema, owner, path, table);
analysis.unregisterTableForView();
analysis.registerTableForView(table, name, isMaterializedView);
RelationType descriptor = analyzeView(query, name, catalog, schema, owner, path, table);
analysis.unregisterTableForView();

checkViewStaleness(columns, descriptor.getVisibleFields(), name, table)
.ifPresent(explanation -> { throw semanticException(VIEW_IS_STALE, table, "View '%s' is stale or in invalid state: %s", name, explanation); });
checkViewStaleness(columns, descriptor.getVisibleFields(), name, table)
.ifPresent(explanation -> { throw semanticException(VIEW_IS_STALE, table, "View '%s' is stale or in invalid state: %s", name, explanation); });

if (storageTable.isEmpty()) {
analysis.registerNamedQuery(table, query);
}
}
else {
checkArgument(storageTable.isPresent(), "A storage table must be present when query analysis is skipped");
}

// Derive the type of the view from the stored definition, not from the analysis of the underlying query.
// This is needed in case the underlying table(s) changed and the query in the view now produces types that
Expand All @@ -2621,9 +2653,6 @@ private Scope createScopeForView(
List<Field> storageTableFields = analyzeStorageTable(table, viewFields, storageTable.get());
analysis.setMaterializedViewStorageTableFields(table, storageTableFields);
}
else {
analysis.registerNamedQuery(table, query);
}

Scope accessControlScope = Scope.builder()
.withRelationType(RelationId.anonymous(), new RelationType(viewFields))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.trino.metadata.ViewPropertyManager;
import io.trino.security.AccessControl;
import io.trino.spi.connector.CatalogSchemaName;
import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.function.FunctionKind;
Expand All @@ -61,6 +62,7 @@
import io.trino.sql.tree.Cast;
import io.trino.sql.tree.ColumnDefinition;
import io.trino.sql.tree.CreateMaterializedView;
import io.trino.sql.tree.CreateMaterializedView.WhenStaleBehavior;
import io.trino.sql.tree.CreateSchema;
import io.trino.sql.tree.CreateTable;
import io.trino.sql.tree.CreateView;
Expand Down Expand Up @@ -562,11 +564,21 @@ private Query showCreateMaterializedView(ShowCreate node)
false,
false,
Optional.empty(), // TODO support GRACE PERIOD
viewDefinition.flatMap(MaterializedViewDefinition::getWhenStaleBehavior)
.map(Visitor::toSqlWhenStaleBehavior),
propertyNodes,
viewDefinition.get().getComment())).trim();
return singleValueQuery("Create Materialized View", sql);
}

private static WhenStaleBehavior toSqlWhenStaleBehavior(ConnectorMaterializedViewDefinition.WhenStaleBehavior whenStale)
{
return switch (whenStale) {
case INLINE -> WhenStaleBehavior.INLINE;
case FAIL -> WhenStaleBehavior.FAIL;
};
}

private Query showCreateView(ShowCreate node)
{
QualifiedObjectName objectName = createQualifiedObjectName(session, node, node.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ protected MaterializedViewDefinition someMaterializedView(String sql, List<ViewC
columns,
Optional.empty(),
Optional.empty(),
Optional.empty(),
Identity.ofUser("owner"),
ImmutableList.of(),
Optional.empty());
Expand Down Expand Up @@ -553,6 +554,7 @@ public void setMaterializedViewColumnComment(Session session, QualifiedObjectNam
.map(currentViewColumn -> columnName.equals(currentViewColumn.name()) ? new ViewColumn(currentViewColumn.name(), currentViewColumn.type(), comment) : currentViewColumn)
.collect(toImmutableList()),
view.getGracePeriod(),
view.getWhenStaleBehavior(),
view.getComment(),
view.getRunAsIdentity().get(),
view.getPath(),
Expand Down
Loading