Skip to content

Commit ce1cf50

Browse files
committed
Add support for row-level modification in Cassandra connector
The WHERE clause in an UPDATE statement for Cassandra must specify the full primary key (all columns in the partition key and all columns in the clustering key, if any). The conditions in the WHERE clause must be EQ conditions.
1 parent 39da6f4 commit ce1cf50

File tree

9 files changed

+362
-44
lines changed

9 files changed

+362
-44
lines changed

plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClusteringPredicatesExtractor.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,20 @@ public TupleDomain<ColumnHandle> getUnenforcedConstraints()
5353
return predicates.filter((columnHandle, domain) -> !clusteringPushDownResult.hasBeenFullyPushed(columnHandle));
5454
}
5555

56+
public boolean includesAllClusteringColumnsAndHasAllEqPredicates()
57+
{
58+
return clusteringPushDownResult.includesAllClusteringColumnsAndHasAllEqPredicates;
59+
}
60+
5661
private ClusteringPushDownResult getClusteringKeysSet(List<CassandraColumnHandle> clusteringColumns, TupleDomain<ColumnHandle> predicates)
5762
{
5863
ImmutableSet.Builder<ColumnHandle> fullyPushedColumnPredicates = ImmutableSet.builder();
5964
ImmutableList.Builder<String> clusteringColumnSql = ImmutableList.builder();
65+
int eqPredicatesCount = 0;
6066
for (CassandraColumnHandle columnHandle : clusteringColumns) {
67+
if (predicates.getDomains().isEmpty()) {
68+
break;
69+
}
6170
Domain domain = predicates.getDomains().get().get(columnHandle);
6271
if (domain == null) {
6372
break;
@@ -109,10 +118,13 @@ private ClusteringPushDownResult getClusteringKeysSet(List<CassandraColumnHandle
109118
if (predicateString.contains(">") || predicateString.contains("<")) {
110119
break;
111120
}
121+
else {
122+
eqPredicatesCount++;
123+
}
112124
}
113125
List<String> clusteringColumnPredicates = clusteringColumnSql.build();
114126

115-
return new ClusteringPushDownResult(fullyPushedColumnPredicates.build(), Joiner.on(" AND ").join(clusteringColumnPredicates));
127+
return new ClusteringPushDownResult(fullyPushedColumnPredicates.build(), Joiner.on(" AND ").join(clusteringColumnPredicates), eqPredicatesCount == clusteringColumns.size());
116128
}
117129

118130
private String toCqlLiteral(CassandraColumnHandle columnHandle, Object value)
@@ -163,7 +175,7 @@ private String translateRangeIntoCql(CassandraColumnHandle columnHandle, Range r
163175
return upperBoundPredicate;
164176
}
165177

166-
private record ClusteringPushDownResult(Set<ColumnHandle> fullyPushedColumnPredicates, String domainQuery)
178+
private record ClusteringPushDownResult(Set<ColumnHandle> fullyPushedColumnPredicates, String domainQuery, boolean includesAllClusteringColumnsAndHasAllEqPredicates)
167179
{
168180
private ClusteringPushDownResult
169181
{

plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraMetadata.java

Lines changed: 79 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,13 @@
4242
import io.trino.spi.connector.RelationColumnsMetadata;
4343
import io.trino.spi.connector.RelationCommentMetadata;
4444
import io.trino.spi.connector.RetryMode;
45+
import io.trino.spi.connector.RowChangeParadigm;
4546
import io.trino.spi.connector.SaveMode;
4647
import io.trino.spi.connector.SchemaNotFoundException;
4748
import io.trino.spi.connector.SchemaTableName;
4849
import io.trino.spi.connector.TableFunctionApplicationResult;
4950
import io.trino.spi.connector.TableNotFoundException;
51+
import io.trino.spi.expression.Constant;
5052
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
5153
import io.trino.spi.predicate.TupleDomain;
5254
import io.trino.spi.statistics.ComputedStatistics;
@@ -78,6 +80,7 @@
7880
import static io.trino.spi.connector.RelationColumnsMetadata.forTable;
7981
import static io.trino.spi.connector.RelationCommentMetadata.forRelation;
8082
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
83+
import static io.trino.spi.connector.RowChangeParadigm.CHANGE_ONLY_UPDATED_COLUMNS;
8184
import static io.trino.spi.connector.SaveMode.REPLACE;
8285
import static java.lang.String.format;
8386
import static java.util.Locale.ENGLISH;
@@ -285,6 +288,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
285288

286289
String clusteringKeyPredicates = "";
287290
TupleDomain<ColumnHandle> unenforcedConstraint;
291+
Boolean includesAllClusteringColumnsAndHasAllEqPredicates = null;
288292
if (partitionResult.unpartitioned() || partitionResult.indexedColumnPredicatePushdown()) {
289293
// When the filter is missing at least one of the partition keys or when the table is not partitioned,
290294
// use the raw unenforced constraint of the partitionResult
@@ -297,6 +301,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
297301
partitionResult.unenforcedConstraint());
298302
clusteringKeyPredicates = clusteringPredicatesExtractor.getClusteringKeyPredicates();
299303
unenforcedConstraint = clusteringPredicatesExtractor.getUnenforcedConstraints();
304+
includesAllClusteringColumnsAndHasAllEqPredicates = clusteringPredicatesExtractor.includesAllClusteringColumnsAndHasAllEqPredicates();
300305
}
301306

302307
Optional<List<CassandraPartition>> currentPartitions = handle.getPartitions();
@@ -313,7 +318,8 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
313318
handle.getTableName(),
314319
Optional.of(partitionResult.partitions()),
315320
// TODO this should probably be AND-ed with handle.getClusteringKeyPredicates()
316-
clusteringKeyPredicates)),
321+
clusteringKeyPredicates,
322+
includesAllClusteringColumnsAndHasAllEqPredicates)),
317323
unenforcedConstraint,
318324
constraint.getExpression(),
319325
false));
@@ -470,39 +476,103 @@ private static boolean isHiddenIdColumn(CassandraColumnHandle columnHandle)
470476
}
471477

472478
@Override
473-
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map<Integer, Collection<ColumnHandle>> updateCaseColumns, RetryMode retryMode)
479+
public RowChangeParadigm getRowChangeParadigm(ConnectorSession session, ConnectorTableHandle tableHandle)
474480
{
475-
throw new TrinoException(NOT_SUPPORTED, "Delete without primary key or partition key is not supported");
481+
return CHANGE_ONLY_UPDATED_COLUMNS;
476482
}
477483

484+
/**
485+
* Attempt to push down an update operation into the connector. If a connector
486+
* can execute an update for the table handle on its own, it should return a
487+
* table handle, which will be passed back to {@link #executeUpdate} during
488+
* query executing to actually execute the update.
489+
*/
478490
@Override
479-
public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
491+
public Optional<ConnectorTableHandle> applyUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, Map<ColumnHandle, Constant> assignments)
480492
{
481-
return new CassandraColumnHandle("$update_row_id", 0, CassandraTypes.TEXT, false, false, false, true);
493+
CassandraNamedRelationHandle table = ((CassandraTableHandle) tableHandle).getRequiredNamedRelation();
494+
if (cassandraSession.isMaterializedView(table.getSchemaTableName())) {
495+
throw new TrinoException(NOT_SUPPORTED, "Updating materialized views not yet supported");
496+
}
497+
498+
CassandraNamedRelationHandle handle = ((CassandraTableHandle) tableHandle).getRequiredNamedRelation();
499+
List<CassandraPartition> partitions = handle.getPartitions()
500+
.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Updating without partition key is not supported"));
501+
if (partitions.isEmpty()) {
502+
// there are no records of a given partition key
503+
throw new TrinoException(NOT_SUPPORTED, "Updating without partition key is not supported");
504+
}
505+
if (!handle.getIncludesAllClusteringColumnsAndHasAllEqPredicates()) {
506+
throw new TrinoException(NOT_SUPPORTED, "Updating without all clustering keys or with non-eq predicates is not supported");
507+
}
508+
509+
Map<String, String> assignmentsMap = new HashMap<>();
510+
for (Map.Entry<ColumnHandle, Constant> entry : assignments.entrySet()) {
511+
CassandraColumnHandle column = (CassandraColumnHandle) entry.getKey();
512+
if (isHiddenIdColumn(column)) {
513+
throw new TrinoException(NOT_SUPPORTED, "Updating the hidden id column is not supported");
514+
}
515+
Object value = entry.getValue().getValue();
516+
if (value == null) {
517+
throw new TrinoException(NOT_SUPPORTED, "Updating columns to null is not supported");
518+
}
519+
String cqlLiteral = cassandraTypeManager.toCqlLiteral(column.cassandraType(), value); // validate that the value can be converted to the cassandra type
520+
assignmentsMap.put(column.name(), cqlLiteral);
521+
}
522+
523+
return Optional.of(new CassandraUpdateTableHandle(tableHandle, assignmentsMap));
482524
}
483525

526+
/**
527+
* Execute the update operation on the handle returned from {@link #applyUpdate}.
528+
*/
484529
@Override
485-
public Optional<ConnectorTableHandle> applyDelete(ConnectorSession session, ConnectorTableHandle handle)
530+
public OptionalLong executeUpdate(ConnectorSession session, ConnectorTableHandle tableHandle)
486531
{
487-
return Optional.of(handle);
532+
CassandraUpdateTableHandle updateTableHandle = ((CassandraUpdateTableHandle) tableHandle);
533+
CassandraTableHandle cassandraTableHandle = (CassandraTableHandle) updateTableHandle.tableHandle();
534+
CassandraNamedRelationHandle namedRelationHandle = cassandraTableHandle.getRequiredNamedRelation();
535+
for (String cql : CassandraCqlUtils.getUpdateQueries(namedRelationHandle, updateTableHandle.assignments())) {
536+
cassandraSession.execute(cql);
537+
}
538+
return OptionalLong.empty();
488539
}
489540

490541
@Override
491-
public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle deleteHandle)
542+
public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
543+
{
544+
return new CassandraColumnHandle("$update_row_id", 0, CassandraTypes.TEXT, false, false, false, true);
545+
}
546+
547+
@Override
548+
public Optional<ConnectorTableHandle> applyDelete(ConnectorSession session, ConnectorTableHandle deleteHandle)
492549
{
493550
CassandraNamedRelationHandle handle = ((CassandraTableHandle) deleteHandle).getRequiredNamedRelation();
494551
List<CassandraPartition> partitions = handle.getPartitions()
495552
.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Deleting without partition key is not supported"));
496553
if (partitions.isEmpty()) {
497554
// there are no records of a given partition key
498-
return OptionalLong.empty();
555+
return Optional.empty();
499556
}
557+
return Optional.of(deleteHandle);
558+
}
559+
560+
@Override
561+
public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle deleteHandle)
562+
{
563+
CassandraNamedRelationHandle handle = ((CassandraTableHandle) deleteHandle).getRequiredNamedRelation();
500564
for (String cql : CassandraCqlUtils.getDeleteQueries(handle)) {
501565
cassandraSession.execute(cql);
502566
}
503567
return OptionalLong.empty();
504568
}
505569

570+
@Override
571+
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map<Integer, Collection<ColumnHandle>> updateCaseColumns, RetryMode retryMode)
572+
{
573+
throw new TrinoException(NOT_SUPPORTED, "This connector does not support modifying table rows");
574+
}
575+
506576
@Override
507577
public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle)
508578
{

plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraNamedRelationHandle.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,23 +34,26 @@ public class CassandraNamedRelationHandle
3434
private final String tableName;
3535
private final Optional<List<CassandraPartition>> partitions;
3636
private final String clusteringKeyPredicates;
37+
private final Boolean includesAllClusteringColumnsAndHasAllEqPredicates;
3738

3839
public CassandraNamedRelationHandle(String schemaName, String tableName)
3940
{
40-
this(schemaName, tableName, Optional.empty(), "");
41+
this(schemaName, tableName, Optional.empty(), "", null);
4142
}
4243

4344
@JsonCreator
4445
public CassandraNamedRelationHandle(
4546
@JsonProperty("schemaName") String schemaName,
4647
@JsonProperty("tableName") String tableName,
4748
@JsonProperty("partitions") Optional<List<CassandraPartition>> partitions,
48-
@JsonProperty("clusteringKeyPredicates") String clusteringKeyPredicates)
49+
@JsonProperty("clusteringKeyPredicates") String clusteringKeyPredicates,
50+
@JsonProperty("includesAllClusteringColumnsAndHasAllEqPredicates") Boolean includesAllClusteringColumnsAndHasAllEqPredicates)
4951
{
5052
this.schemaName = requireNonNull(schemaName, "schemaName is null");
5153
this.tableName = requireNonNull(tableName, "tableName is null");
5254
this.partitions = partitions.map(ImmutableList::copyOf);
5355
this.clusteringKeyPredicates = requireNonNull(clusteringKeyPredicates, "clusteringKeyPredicates is null");
56+
this.includesAllClusteringColumnsAndHasAllEqPredicates = includesAllClusteringColumnsAndHasAllEqPredicates;
5457
}
5558

5659
@JsonProperty
@@ -77,6 +80,12 @@ public String getClusteringKeyPredicates()
7780
return clusteringKeyPredicates;
7881
}
7982

83+
@JsonProperty
84+
public Boolean getIncludesAllClusteringColumnsAndHasAllEqPredicates()
85+
{
86+
return includesAllClusteringColumnsAndHasAllEqPredicates;
87+
}
88+
8089
public SchemaTableName getSchemaTableName()
8190
{
8291
return new SchemaTableName(schemaName, tableName);
@@ -101,7 +110,8 @@ public boolean equals(Object obj)
101110
return Objects.equals(this.schemaName, other.schemaName) &&
102111
Objects.equals(this.tableName, other.tableName) &&
103112
Objects.equals(this.partitions, other.partitions) &&
104-
Objects.equals(this.clusteringKeyPredicates, other.clusteringKeyPredicates);
113+
Objects.equals(this.clusteringKeyPredicates, other.clusteringKeyPredicates) &&
114+
Objects.equals(this.includesAllClusteringColumnsAndHasAllEqPredicates, other.includesAllClusteringColumnsAndHasAllEqPredicates);
105115
}
106116

107117
@Override
@@ -122,6 +132,9 @@ public String toString()
122132
if (!clusteringKeyPredicates.isEmpty()) {
123133
result += format(" constraint(%s)", clusteringKeyPredicates);
124134
}
135+
if (includesAllClusteringColumnsAndHasAllEqPredicates != null) {
136+
result += format(" includesAllClusteringColumnsAndHasAllEqPredicates(%s)", includesAllClusteringColumnsAndHasAllEqPredicates);
137+
}
125138
return result;
126139
}
127140
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.cassandra;
15+
16+
import io.trino.spi.connector.ConnectorTableHandle;
17+
18+
import java.util.Map;
19+
20+
import static java.util.Objects.requireNonNull;
21+
22+
public record CassandraUpdateTableHandle(
23+
ConnectorTableHandle tableHandle,
24+
Map<String, String> assignments)
25+
implements ConnectorTableHandle
26+
{
27+
public CassandraUpdateTableHandle
28+
{
29+
requireNonNull(tableHandle, "tableHandle is null");
30+
requireNonNull(assignments, "assignments is null");
31+
}
32+
33+
@Override
34+
public String toString()
35+
{
36+
return "cassandra:" + tableHandle;
37+
}
38+
}

plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/util/CassandraCqlUtils.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import java.util.ArrayList;
2626
import java.util.List;
27+
import java.util.Map;
2728

2829
import static com.datastax.oss.driver.internal.core.util.Strings.doubleQuote;
2930
import static com.google.common.collect.ImmutableList.toImmutableList;
@@ -113,6 +114,33 @@ private static String getWhereCondition(String partition, String clusteringKeyPr
113114
return String.join(" AND ", conditions);
114115
}
115116

117+
private static String getSetStatements(Map<String, String> assignments)
118+
{
119+
if (!assignments.isEmpty()) {
120+
List<String> setStatements = new ArrayList<>();
121+
for (Map.Entry<String, String> entry : assignments.entrySet()) {
122+
setStatements.add(format("%s = %s", validColumnName(entry.getKey()), entry.getValue()));
123+
}
124+
return String.join(" AND ", setStatements);
125+
}
126+
return "";
127+
}
128+
129+
private static String update(String schemaName, String tableName, CassandraPartition partition, String clusteringKeyPredicates, Map<String, String> assignments)
130+
{
131+
return format("UPDATE \"%s\".\"%s\" SET %s WHERE %s",
132+
schemaName, tableName, getSetStatements(assignments), getWhereCondition(partition.getPartitionId(), clusteringKeyPredicates));
133+
}
134+
135+
public static List<String> getUpdateQueries(CassandraNamedRelationHandle handle, Map<String, String> assignments)
136+
{
137+
ImmutableList.Builder<String> queries = ImmutableList.builder();
138+
for (CassandraPartition partition : handle.getPartitions().orElse(ImmutableList.of())) {
139+
queries.add(update(handle.getSchemaName(), handle.getTableName(), partition, handle.getClusteringKeyPredicates(), assignments));
140+
}
141+
return queries.build();
142+
}
143+
116144
private static String deleteFrom(String schemaName, String tableName, CassandraPartition partition, String clusteringKeyPredicates)
117145
{
118146
return format("DELETE FROM \"%s\".\"%s\" WHERE %s",

plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ private static Map<String, Integer> indexColumns(List<ColumnHandle> columnHandle
462462
private CassandraTableHandle getTableHandle(Optional<List<CassandraPartition>> partitions, String clusteringKeyPredicates)
463463
{
464464
CassandraNamedRelationHandle handle = ((CassandraTableHandle) getTableHandle(tableForDelete)).getRequiredNamedRelation();
465-
return new CassandraTableHandle(new CassandraNamedRelationHandle(handle.getSchemaName(), handle.getTableName(), partitions, clusteringKeyPredicates));
465+
return new CassandraTableHandle(new CassandraNamedRelationHandle(handle.getSchemaName(), handle.getTableName(), partitions, clusteringKeyPredicates, null));
466466
}
467467

468468
private CassandraPartition createPartition(long value1, long value2)

0 commit comments

Comments
 (0)