Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- master
- 'repro-*'
pull_request:
paths-ignore:
- 'docs/**'
Expand Down
61 changes: 61 additions & 0 deletions .github/workflows/trino-cassandra.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
name: Build trino-cassandra

on:
push:
branches:
- "repro-*"

env:
# An envar that signals to tests we are executing in the CI environment
CONTINUOUS_INTEGRATION: true
MAVEN_OPTS: "-Xmx1024M -XX:+ExitOnOutOfMemoryError"
MAVEN_INSTALL_OPTS: "-Xmx2G -XX:+ExitOnOutOfMemoryError"
MAVEN_FAST_INSTALL: "-B -V --quiet -T 1C -DskipTests -Dair.check.skip-all -Dmaven.javadoc.skip=true"
MAVEN_TEST: "-B -Dair.check.skip-all -Dmaven.javadoc.skip=true -DLogTestDurationListener.enabled=true --fail-at-end"
RETRY: .github/bin/retry

jobs:
trino-cassandra:
runs-on: ubuntu-latest
permissions:
id-token: write
contents: read
steps:
- uses: aws-actions/configure-aws-credentials@v5
with:
aws-region: ap-northeast-1
role-to-assume: ${{ vars.ROLE_TO_ASSUME }}
- uses: actions/checkout@v5
- uses: actions/setup-java@v5
with:
distribution: corretto
java-version: 22
- name: Cache local Maven repository
id: cache-maven
uses: actions/cache@v4
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-2-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-2-
- name: Populate maven cache
if: steps.cache-maven.outputs.cache-hit != 'true'
run: ./mvnw de.qaware.maven:go-offline-maven-plugin:resolve-dependencies
- name: Maven Install
run: |
export MAVEN_OPTS="${MAVEN_INSTALL_OPTS}"
./mvnw install ${MAVEN_FAST_INSTALL} -am -pl plugin/trino-cassandra
- name: Upload trino-cassandra.jar to S3
run: |
trino_cassandra_jar=trino-cassandra-$(yq .project.version pom.xml).jar
aws s3api put-object \
--bucket ${{ vars.DEPLOY_S3_BUCKET }} \
--key "${{ vars.DEPLOY_S3_KEY_PREFIX }}/${{ github.ref_name }}/${{ github.run_number }}/${trino_cassandra_jar}" \
--body "./plugin/trino-cassandra/target/${trino_cassandra_jar}"
- name: Upload trino-cassandra-services.jar
run: |
trino_cassandra_services_jar=trino-cassandra-$(yq .project.version pom.xml)-services.jar
aws s3api put-object \
--bucket ${{ vars.DEPLOY_S3_BUCKET }} \
--key "${{ vars.DEPLOY_S3_KEY_PREFIX }}/${{ github.ref_name }}/${{ github.run_number }}/${trino_cassandra_services_jar}" \
--body "./plugin/trino-cassandra/target/${trino_cassandra_services_jar}"
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class CassandraClientConfig
private String keystorePassword;
private File truststorePath;
private String truststorePassword;
private boolean skipPartitionCheck;

@NotNull
@Size(min = 1)
Expand Down Expand Up @@ -441,4 +442,16 @@ public CassandraClientConfig setTruststorePassword(String truststorePassword)
this.truststorePassword = truststorePassword;
return this;
}

public boolean isSkipPartitionCheck()
{
return skipPartitionCheck;
}

@Config("cassandra.skip-partition-check")
public CassandraClientConfig setSkipPartitionCheck(boolean skipPartitionCheck)
{
this.skipPartitionCheck = skipPartitionCheck;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ public static CassandraSession createCassandraSession(
CassandraTelemetry cassandraTelemetry = CassandraTelemetry.create(openTelemetry);
return cassandraTelemetry.wrap(cqlSessionBuilder.build());
},
config.getNoHostAvailableRetryTimeout());
config.getNoHostAvailableRetryTimeout(),
config.isSkipPartitionCheck());
}

private static Optional<SSLContext> buildSslContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.units.Duration;
import io.trino.plugin.cassandra.util.CassandraCqlUtils;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -87,6 +88,7 @@
import static io.trino.plugin.cassandra.util.CassandraCqlUtils.validSchemaName;
import static io.trino.plugin.cassandra.util.CassandraCqlUtils.validTableName;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.util.Comparator.comparing;
import static java.util.Locale.ENGLISH;
Expand All @@ -106,6 +108,7 @@ public class CassandraSession
private final CassandraTypeManager cassandraTypeManager;
private final JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec;
private final Duration noHostAvailableRetryTimeout;
private final boolean skipPartitionCheck;

@GuardedBy("this")
private Supplier<CqlSession> sessionSupplier;
Expand All @@ -116,12 +119,14 @@ public CassandraSession(
CassandraTypeManager cassandraTypeManager,
JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec,
Supplier<CqlSession> sessionSupplier,
Duration noHostAvailableRetryTimeout)
Duration noHostAvailableRetryTimeout,
boolean skipPartitionCheck)
{
this.cassandraTypeManager = requireNonNull(cassandraTypeManager, "cassandraTypeManager is null");
this.extraColumnMetadataCodec = requireNonNull(extraColumnMetadataCodec, "extraColumnMetadataCodec is null");
this.noHostAvailableRetryTimeout = requireNonNull(noHostAvailableRetryTimeout, "noHostAvailableRetryTimeout is null");
this.sessionSupplier = requireNonNull(sessionSupplier, "sessionSupplier is null");
this.skipPartitionCheck = skipPartitionCheck;
}

private synchronized CqlSession session()
Expand Down Expand Up @@ -411,6 +416,9 @@ private Optional<CassandraColumnHandle> buildColumnHandle(RelationMetadata table
*/
public List<CassandraPartition> getPartitions(CassandraTable table, List<Set<Object>> filterPrefixes)
{
if (skipPartitionCheck) {
return buildPartitionsFromFilterPrefixes(table, filterPrefixes);
}
List<CassandraColumnHandle> partitionKeyColumns = table.partitionKeyColumns();

if (filterPrefixes.size() != partitionKeyColumns.size()) {
Expand Down Expand Up @@ -473,6 +481,90 @@ public List<CassandraPartition> getPartitions(CassandraTable table, List<Set<Obj
return partitions.build();
}

private List<CassandraPartition> buildPartitionsFromFilterPrefixes(CassandraTable table, List<Set<Object>> filterPrefixes)
{
List<CassandraColumnHandle> partitionKeyColumns = table.partitionKeyColumns();

if (filterPrefixes.size() != partitionKeyColumns.size()) {
return ImmutableList.of(CassandraPartition.UNPARTITIONED);
}

ByteBuffer buffer = ByteBuffer.allocate(1000);
HashMap<ColumnHandle, NullableValue> map = new HashMap<>();
Set<String> uniquePartitionIds = new HashSet<>();
StringBuilder stringBuilder = new StringBuilder();

boolean isComposite = partitionKeyColumns.size() > 1;

ImmutableList.Builder<CassandraPartition> partitions = ImmutableList.builder();
for (List<Object> values : Sets.cartesianProduct(filterPrefixes)) {
buffer.clear();
map.clear();
stringBuilder.setLength(0);
for (int i = 0; i < partitionKeyColumns.size(); i++) {
Object value = values.get(i);
CassandraColumnHandle columnHandle = partitionKeyColumns.get(i);
CassandraType cassandraType = columnHandle.cassandraType();

switch (cassandraType.kind()) {
case ASCII:
case TEXT:
case VARCHAR:
Slice slice = (Slice) value;
if (isComposite) {
buffer.putShort((short) slice.length());
buffer.put(slice.getBytes());
buffer.put((byte) 0);
}
else {
buffer.put(slice.getBytes());
}
break;
case INT:
int intValue = toIntExact((long) value);
if (isComposite) {
buffer.putShort((short) Integer.BYTES);
buffer.putInt(intValue);
buffer.put((byte) 0);
}
else {
buffer.putInt(intValue);
}
break;
case BIGINT:
if (isComposite) {
buffer.putShort((short) Long.BYTES);
buffer.putLong((long) value);
buffer.put((byte) 0);
}
else {
buffer.putLong((long) value);
}
break;
default:
throw new IllegalStateException("Handling of type " + cassandraType + " is not implemented");
}

map.put(columnHandle, NullableValue.of(cassandraType.trinoType(), value));
if (i > 0) {
stringBuilder.append(" AND ");
}
stringBuilder.append(CassandraCqlUtils.validColumnName(columnHandle.name()));
stringBuilder.append(" = ");
stringBuilder.append(CassandraType.getColumnValueForCql(value, cassandraType));
}
buffer.flip();
byte[] key = new byte[buffer.limit()];
buffer.get(key);
TupleDomain<ColumnHandle> tupleDomain = TupleDomain.fromFixedValues(map);
String partitionId = stringBuilder.toString();
if (uniquePartitionIds.add(partitionId)) {
partitions.add(new CassandraPartition(key, partitionId, tupleDomain, false));
}
}
return partitions.build();
}

public ResultSet execute(String cql)
{
log.debug("Execute cql: %s", cql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
package io.trino.plugin.cassandra;

import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.trino.spi.type.Type;

import java.util.List;

import static io.trino.plugin.cassandra.util.CassandraCqlUtils.quoteStringLiteral;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -78,6 +80,22 @@ public static CassandraType primitiveType(Kind kind, Type trinoType)
return new CassandraType(kind, trinoType, ImmutableList.of());
}

public static String getColumnValueForCql(Object object, CassandraType cassandraType)
{
switch (cassandraType.kind()) {
case ASCII:
case TEXT:
case VARCHAR:
return quoteStringLiteral(((Slice) object).toStringUtf8());
case INT:
case BIGINT:
return object.toString();
default:
throw new IllegalStateException("Handling of type " + cassandraType.kind.name()
+ " is not implemented");
}
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public CassandraServer(DockerImageName imageName, Map<String, String> environmen
CASSANDRA_TYPE_MANAGER,
JsonCodec.listJsonCodec(ExtraColumnMetadata.class),
cqlSessionBuilder::build,
new Duration(1, MINUTES));
new Duration(1, MINUTES),
false);
}

private static String prepareCassandraYaml(String fileName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public void testDefaults()
.setKeystorePath(null)
.setKeystorePassword(null)
.setTruststorePath(null)
.setTruststorePassword(null));
.setTruststorePassword(null)
.setSkipPartitionCheck(false));
}

@Test
Expand Down Expand Up @@ -104,6 +105,7 @@ public void testExplicitPropertyMappings()
.put("cassandra.tls.keystore-password", "keystore-password")
.put("cassandra.tls.truststore-path", truststoreFile.toString())
.put("cassandra.tls.truststore-password", "truststore-password")
.put("cassandra.skip-partition-check", "true")
.buildOrThrow();

CassandraClientConfig expected = new CassandraClientConfig()
Expand Down Expand Up @@ -134,7 +136,8 @@ public void testExplicitPropertyMappings()
.setKeystorePath(keystoreFile.toFile())
.setKeystorePassword("keystore-password")
.setTruststorePath(truststoreFile.toFile())
.setTruststorePassword("truststore-password");
.setTruststorePassword("truststore-password")
.setSkipPartitionCheck(true);

assertFullMapping(properties, expected);
}
Expand Down
Loading
Loading