Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,20 @@ public TupleDomain<ColumnHandle> getUnenforcedConstraints()
return predicates.filter((columnHandle, domain) -> !clusteringPushDownResult.hasBeenFullyPushed(columnHandle));
}

public boolean includesAllClusteringColumnsAndHasAllEqPredicates()
{
return clusteringPushDownResult.includesAllClusteringColumnsAndHasAllEqPredicates;
}

private ClusteringPushDownResult getClusteringKeysSet(List<CassandraColumnHandle> clusteringColumns, TupleDomain<ColumnHandle> predicates)
{
ImmutableSet.Builder<ColumnHandle> fullyPushedColumnPredicates = ImmutableSet.builder();
ImmutableList.Builder<String> clusteringColumnSql = ImmutableList.builder();
int eqPredicatesCount = 0;
for (CassandraColumnHandle columnHandle : clusteringColumns) {
if (predicates.getDomains().isEmpty()) {
break;
}
Domain domain = predicates.getDomains().get().get(columnHandle);
if (domain == null) {
break;
Expand Down Expand Up @@ -109,10 +118,13 @@ private ClusteringPushDownResult getClusteringKeysSet(List<CassandraColumnHandle
if (predicateString.contains(">") || predicateString.contains("<")) {
break;
}
else {
eqPredicatesCount++;
}
}
List<String> clusteringColumnPredicates = clusteringColumnSql.build();

return new ClusteringPushDownResult(fullyPushedColumnPredicates.build(), Joiner.on(" AND ").join(clusteringColumnPredicates));
return new ClusteringPushDownResult(fullyPushedColumnPredicates.build(), Joiner.on(" AND ").join(clusteringColumnPredicates), eqPredicatesCount == clusteringColumns.size());
}

private String toCqlLiteral(CassandraColumnHandle columnHandle, Object value)
Expand Down Expand Up @@ -163,7 +175,7 @@ private String translateRangeIntoCql(CassandraColumnHandle columnHandle, Range r
return upperBoundPredicate;
}

private record ClusteringPushDownResult(Set<ColumnHandle> fullyPushedColumnPredicates, String domainQuery)
private record ClusteringPushDownResult(Set<ColumnHandle> fullyPushedColumnPredicates, String domainQuery, boolean includesAllClusteringColumnsAndHasAllEqPredicates)
{
private ClusteringPushDownResult
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@
import io.trino.spi.connector.RelationColumnsMetadata;
import io.trino.spi.connector.RelationCommentMetadata;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SaveMode;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableFunctionApplicationResult;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.expression.Constant;
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.statistics.ComputedStatistics;
Expand Down Expand Up @@ -78,6 +80,7 @@
import static io.trino.spi.connector.RelationColumnsMetadata.forTable;
import static io.trino.spi.connector.RelationCommentMetadata.forRelation;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.connector.RowChangeParadigm.CHANGE_ONLY_UPDATED_COLUMNS;
import static io.trino.spi.connector.SaveMode.REPLACE;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
Expand Down Expand Up @@ -285,6 +288,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C

String clusteringKeyPredicates = "";
TupleDomain<ColumnHandle> unenforcedConstraint;
Boolean includesAllClusteringColumnsAndHasAllEqPredicates = null;
if (partitionResult.unpartitioned() || partitionResult.indexedColumnPredicatePushdown()) {
// When the filter is missing at least one of the partition keys or when the table is not partitioned,
// use the raw unenforced constraint of the partitionResult
Expand All @@ -297,6 +301,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
partitionResult.unenforcedConstraint());
clusteringKeyPredicates = clusteringPredicatesExtractor.getClusteringKeyPredicates();
unenforcedConstraint = clusteringPredicatesExtractor.getUnenforcedConstraints();
includesAllClusteringColumnsAndHasAllEqPredicates = clusteringPredicatesExtractor.includesAllClusteringColumnsAndHasAllEqPredicates();
}

Optional<List<CassandraPartition>> currentPartitions = handle.getPartitions();
Expand All @@ -313,7 +318,8 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
handle.getTableName(),
Optional.of(partitionResult.partitions()),
// TODO this should probably be AND-ed with handle.getClusteringKeyPredicates()
clusteringKeyPredicates)),
clusteringKeyPredicates,
includesAllClusteringColumnsAndHasAllEqPredicates)),
unenforcedConstraint,
constraint.getExpression(),
false));
Expand Down Expand Up @@ -470,39 +476,103 @@ private static boolean isHiddenIdColumn(CassandraColumnHandle columnHandle)
}

@Override
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map<Integer, Collection<ColumnHandle>> updateCaseColumns, RetryMode retryMode)
public RowChangeParadigm getRowChangeParadigm(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new TrinoException(NOT_SUPPORTED, "Delete without primary key or partition key is not supported");
return CHANGE_ONLY_UPDATED_COLUMNS;
}

/**
* Attempt to push down an update operation into the connector. If a connector
* can execute an update for the table handle on its own, it should return a
* table handle, which will be passed back to {@link #executeUpdate} during
* query executing to actually execute the update.
*/
@Override
public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
public Optional<ConnectorTableHandle> applyUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, Map<ColumnHandle, Constant> assignments)
{
return new CassandraColumnHandle("$update_row_id", 0, CassandraTypes.TEXT, false, false, false, true);
CassandraNamedRelationHandle table = ((CassandraTableHandle) tableHandle).getRequiredNamedRelation();
if (cassandraSession.isMaterializedView(table.getSchemaTableName())) {
throw new TrinoException(NOT_SUPPORTED, "Updating materialized views not yet supported");
}

CassandraNamedRelationHandle handle = ((CassandraTableHandle) tableHandle).getRequiredNamedRelation();
List<CassandraPartition> partitions = handle.getPartitions()
.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Updating without partition key is not supported"));
if (partitions.isEmpty()) {
// there are no records of a given partition key
throw new TrinoException(NOT_SUPPORTED, "Updating without partition key is not supported");
}
if (!handle.getIncludesAllClusteringColumnsAndHasAllEqPredicates()) {
throw new TrinoException(NOT_SUPPORTED, "Updating without all clustering keys or with non-eq predicates is not supported");
}

Map<String, String> assignmentsMap = new HashMap<>();
for (Map.Entry<ColumnHandle, Constant> entry : assignments.entrySet()) {
CassandraColumnHandle column = (CassandraColumnHandle) entry.getKey();
if (isHiddenIdColumn(column)) {
throw new TrinoException(NOT_SUPPORTED, "Updating the hidden id column is not supported");
}
Object value = entry.getValue().getValue();
if (value == null) {
throw new TrinoException(NOT_SUPPORTED, "Updating columns to null is not supported");
}
String cqlLiteral = cassandraTypeManager.toCqlLiteral(column.cassandraType(), value); // validate that the value can be converted to the cassandra type
assignmentsMap.put(column.name(), cqlLiteral);
}

return Optional.of(new CassandraUpdateTableHandle(tableHandle, assignmentsMap));
}

/**
* Execute the update operation on the handle returned from {@link #applyUpdate}.
*/
@Override
public Optional<ConnectorTableHandle> applyDelete(ConnectorSession session, ConnectorTableHandle handle)
public OptionalLong executeUpdate(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return Optional.of(handle);
CassandraUpdateTableHandle updateTableHandle = ((CassandraUpdateTableHandle) tableHandle);
CassandraTableHandle cassandraTableHandle = (CassandraTableHandle) updateTableHandle.tableHandle();
CassandraNamedRelationHandle namedRelationHandle = cassandraTableHandle.getRequiredNamedRelation();
for (String cql : CassandraCqlUtils.getUpdateQueries(namedRelationHandle, updateTableHandle.assignments())) {
cassandraSession.execute(cql);
}
return OptionalLong.empty();
}

@Override
public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle deleteHandle)
public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return new CassandraColumnHandle("$update_row_id", 0, CassandraTypes.TEXT, false, false, false, true);
}

@Override
public Optional<ConnectorTableHandle> applyDelete(ConnectorSession session, ConnectorTableHandle deleteHandle)
{
CassandraNamedRelationHandle handle = ((CassandraTableHandle) deleteHandle).getRequiredNamedRelation();
List<CassandraPartition> partitions = handle.getPartitions()
.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Deleting without partition key is not supported"));
if (partitions.isEmpty()) {
// there are no records of a given partition key
return OptionalLong.empty();
return Optional.empty();
}
return Optional.of(deleteHandle);
}

@Override
public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle deleteHandle)
{
CassandraNamedRelationHandle handle = ((CassandraTableHandle) deleteHandle).getRequiredNamedRelation();
for (String cql : CassandraCqlUtils.getDeleteQueries(handle)) {
cassandraSession.execute(cql);
}
return OptionalLong.empty();
}

@Override
public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorTableHandle tableHandle, Map<Integer, Collection<ColumnHandle>> updateCaseColumns, RetryMode retryMode)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support modifying table rows");
}

@Override
public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,26 @@ public class CassandraNamedRelationHandle
private final String tableName;
private final Optional<List<CassandraPartition>> partitions;
private final String clusteringKeyPredicates;
private final Boolean includesAllClusteringColumnsAndHasAllEqPredicates;

public CassandraNamedRelationHandle(String schemaName, String tableName)
{
this(schemaName, tableName, Optional.empty(), "");
this(schemaName, tableName, Optional.empty(), "", null);
}

@JsonCreator
public CassandraNamedRelationHandle(
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("partitions") Optional<List<CassandraPartition>> partitions,
@JsonProperty("clusteringKeyPredicates") String clusteringKeyPredicates)
@JsonProperty("clusteringKeyPredicates") String clusteringKeyPredicates,
@JsonProperty("includesAllClusteringColumnsAndHasAllEqPredicates") Boolean includesAllClusteringColumnsAndHasAllEqPredicates)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.partitions = partitions.map(ImmutableList::copyOf);
this.clusteringKeyPredicates = requireNonNull(clusteringKeyPredicates, "clusteringKeyPredicates is null");
this.includesAllClusteringColumnsAndHasAllEqPredicates = includesAllClusteringColumnsAndHasAllEqPredicates;
}

@JsonProperty
Expand All @@ -77,6 +80,12 @@ public String getClusteringKeyPredicates()
return clusteringKeyPredicates;
}

@JsonProperty
public Boolean getIncludesAllClusteringColumnsAndHasAllEqPredicates()
{
return includesAllClusteringColumnsAndHasAllEqPredicates;
}

public SchemaTableName getSchemaTableName()
{
return new SchemaTableName(schemaName, tableName);
Expand All @@ -101,7 +110,8 @@ public boolean equals(Object obj)
return Objects.equals(this.schemaName, other.schemaName) &&
Objects.equals(this.tableName, other.tableName) &&
Objects.equals(this.partitions, other.partitions) &&
Objects.equals(this.clusteringKeyPredicates, other.clusteringKeyPredicates);
Objects.equals(this.clusteringKeyPredicates, other.clusteringKeyPredicates) &&
Objects.equals(this.includesAllClusteringColumnsAndHasAllEqPredicates, other.includesAllClusteringColumnsAndHasAllEqPredicates);
}

@Override
Expand All @@ -122,6 +132,9 @@ public String toString()
if (!clusteringKeyPredicates.isEmpty()) {
result += format(" constraint(%s)", clusteringKeyPredicates);
}
if (includesAllClusteringColumnsAndHasAllEqPredicates != null) {
result += format(" includesAllClusteringColumnsAndHasAllEqPredicates(%s)", includesAllClusteringColumnsAndHasAllEqPredicates);
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.cassandra;

import io.trino.spi.connector.ConnectorTableHandle;

import java.util.Map;

import static java.util.Objects.requireNonNull;

public record CassandraUpdateTableHandle(
ConnectorTableHandle tableHandle,
Map<String, String> assignments)
implements ConnectorTableHandle
{
public CassandraUpdateTableHandle
{
requireNonNull(tableHandle, "tableHandle is null");
requireNonNull(assignments, "assignments is null");
}

@Override
public String toString()
{
return "cassandra:" + tableHandle;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static com.datastax.oss.driver.internal.core.util.Strings.doubleQuote;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -113,6 +114,33 @@ private static String getWhereCondition(String partition, String clusteringKeyPr
return String.join(" AND ", conditions);
}

private static String getSetStatements(Map<String, String> assignments)
{
if (!assignments.isEmpty()) {
List<String> setStatements = new ArrayList<>();
for (Map.Entry<String, String> entry : assignments.entrySet()) {
setStatements.add(format("%s = %s", validColumnName(entry.getKey()), entry.getValue()));
}
return String.join(" AND ", setStatements);
}
return "";
}

private static String update(String schemaName, String tableName, CassandraPartition partition, String clusteringKeyPredicates, Map<String, String> assignments)
{
return format("UPDATE \"%s\".\"%s\" SET %s WHERE %s",
schemaName, tableName, getSetStatements(assignments), getWhereCondition(partition.getPartitionId(), clusteringKeyPredicates));
}

public static List<String> getUpdateQueries(CassandraNamedRelationHandle handle, Map<String, String> assignments)
{
ImmutableList.Builder<String> queries = ImmutableList.builder();
for (CassandraPartition partition : handle.getPartitions().orElse(ImmutableList.of())) {
queries.add(update(handle.getSchemaName(), handle.getTableName(), partition, handle.getClusteringKeyPredicates(), assignments));
}
return queries.build();
}

private static String deleteFrom(String schemaName, String tableName, CassandraPartition partition, String clusteringKeyPredicates)
{
return format("DELETE FROM \"%s\".\"%s\" WHERE %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ private static Map<String, Integer> indexColumns(List<ColumnHandle> columnHandle
private CassandraTableHandle getTableHandle(Optional<List<CassandraPartition>> partitions, String clusteringKeyPredicates)
{
CassandraNamedRelationHandle handle = ((CassandraTableHandle) getTableHandle(tableForDelete)).getRequiredNamedRelation();
return new CassandraTableHandle(new CassandraNamedRelationHandle(handle.getSchemaName(), handle.getTableName(), partitions, clusteringKeyPredicates));
return new CassandraTableHandle(new CassandraNamedRelationHandle(handle.getSchemaName(), handle.getTableName(), partitions, clusteringKeyPredicates, null));
}

private CassandraPartition createPartition(long value1, long value2)
Expand Down
Loading