From dfbfe2965a6b3232a8571a491269ff705353f0ad Mon Sep 17 00:00:00 2001 From: abicky Date: Wed, 3 Oct 2018 00:01:56 +0900 Subject: [PATCH 01/12] Bundle trino-cassandra for local development --- testing/trino-server-dev/etc/catalog/cassandra.properties | 1 + 1 file changed, 1 insertion(+) diff --git a/testing/trino-server-dev/etc/catalog/cassandra.properties b/testing/trino-server-dev/etc/catalog/cassandra.properties index f61e1fe3eb49..003dfa6c64a9 100644 --- a/testing/trino-server-dev/etc/catalog/cassandra.properties +++ b/testing/trino-server-dev/etc/catalog/cassandra.properties @@ -3,3 +3,4 @@ connector.name=cassandra cassandra.contact-points=localhost cassandra.allow-drop-table=true cassandra.load-policy.dc-aware.local-dc=datacenter1 +cassandra.contact-points=localhost From 6aab088f61989184c68816cc4984683d611891f3 Mon Sep 17 00:00:00 2001 From: abicky Date: Wed, 10 Oct 2018 23:45:10 +0900 Subject: [PATCH 02/12] Introduce cassandra.skip-parition-check NativeCassandraSession#getPartitions is very slow if there are many partitions used in WHERE clause. In our use, the partitions usually exist, so introduce cassandra.skip-parition-check property to reduce planning time. --- .../cassandra/CassandraClientConfig.java | 13 ++ .../cassandra/CassandraClientModule.java | 3 +- .../plugin/cassandra/CassandraSession.java | 92 ++++++++- .../trino/plugin/cassandra/CassandraType.java | 18 ++ .../plugin/cassandra/CassandraServer.java | 14 +- .../cassandra/TestCassandraClientConfig.java | 7 +- .../cassandra/TestCassandraSession.java | 185 ++++++++++++++++++ .../plugin/cassandra/TestingScyllaServer.java | 3 +- 8 files changed, 329 insertions(+), 6 deletions(-) create mode 100644 plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSession.java diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClientConfig.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClientConfig.java index c476eda31514..87f120d13ab4 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClientConfig.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClientConfig.java @@ -83,6 +83,7 @@ public class CassandraClientConfig private String keystorePassword; private File truststorePath; private String truststorePassword; + private boolean skipPartitionCheck; @NotNull @Size(min = 1) @@ -441,4 +442,16 @@ public CassandraClientConfig setTruststorePassword(String truststorePassword) this.truststorePassword = truststorePassword; return this; } + + public boolean isSkipPartitionCheck() + { + return skipPartitionCheck; + } + + @Config("cassandra.skip-partition-check") + public CassandraClientConfig setSkipPartitionCheck(boolean skipPartitionCheck) + { + this.skipPartitionCheck = skipPartitionCheck; + return this; + } } diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClientModule.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClientModule.java index 0ac8a8ce7861..234fc347f166 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClientModule.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClientModule.java @@ -190,7 +190,8 @@ public static CassandraSession createCassandraSession( CassandraTelemetry cassandraTelemetry = CassandraTelemetry.create(openTelemetry); return cassandraTelemetry.wrap(cqlSessionBuilder.build()); }, - config.getNoHostAvailableRetryTimeout()); + config.getNoHostAvailableRetryTimeout(), + config.isSkipPartitionCheck()); } private static Optional buildSslContext( diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java index 216a073e23ec..0914ad159a0b 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java @@ -49,6 +49,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy; import io.airlift.json.JsonCodec; import io.airlift.log.Logger; +import io.airlift.slice.Slice; import io.airlift.units.Duration; import io.trino.plugin.cassandra.util.CassandraCqlUtils; import io.trino.spi.TrinoException; @@ -87,6 +88,7 @@ import static io.trino.plugin.cassandra.util.CassandraCqlUtils.validSchemaName; import static io.trino.plugin.cassandra.util.CassandraCqlUtils.validTableName; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.lang.Math.toIntExact; import static java.lang.String.format; import static java.util.Comparator.comparing; import static java.util.Locale.ENGLISH; @@ -106,6 +108,7 @@ public class CassandraSession private final CassandraTypeManager cassandraTypeManager; private final JsonCodec> extraColumnMetadataCodec; private final Duration noHostAvailableRetryTimeout; + private final boolean skipPartitionCheck; @GuardedBy("this") private Supplier sessionSupplier; @@ -116,12 +119,14 @@ public CassandraSession( CassandraTypeManager cassandraTypeManager, JsonCodec> extraColumnMetadataCodec, Supplier sessionSupplier, - Duration noHostAvailableRetryTimeout) + Duration noHostAvailableRetryTimeout, + boolean skipPartitionCheck) { this.cassandraTypeManager = requireNonNull(cassandraTypeManager, "cassandraTypeManager is null"); this.extraColumnMetadataCodec = requireNonNull(extraColumnMetadataCodec, "extraColumnMetadataCodec is null"); this.noHostAvailableRetryTimeout = requireNonNull(noHostAvailableRetryTimeout, "noHostAvailableRetryTimeout is null"); this.sessionSupplier = requireNonNull(sessionSupplier, "sessionSupplier is null"); + this.skipPartitionCheck = skipPartitionCheck; } private synchronized CqlSession session() @@ -411,6 +416,9 @@ private Optional buildColumnHandle(RelationMetadata table */ public List getPartitions(CassandraTable table, List> filterPrefixes) { + if (skipPartitionCheck) { + return buildPartitionsFromFilterPrefixes(table, filterPrefixes); + } List partitionKeyColumns = table.partitionKeyColumns(); if (filterPrefixes.size() != partitionKeyColumns.size()) { @@ -473,6 +481,88 @@ public List getPartitions(CassandraTable table, List buildPartitionsFromFilterPrefixes(CassandraTable table, List> filterPrefixes) + { + List partitionKeyColumns = table.getPartitionKeyColumns(); + + if (filterPrefixes.size() != partitionKeyColumns.size()) { + return ImmutableList.of(CassandraPartition.UNPARTITIONED); + } + + ByteBuffer buffer = ByteBuffer.allocate(1000); + HashMap map = new HashMap<>(); + Set uniquePartitionIds = new HashSet<>(); + StringBuilder stringBuilder = new StringBuilder(); + + boolean isComposite = partitionKeyColumns.size() > 1; + + ImmutableList.Builder partitions = ImmutableList.builder(); + for (List values : Sets.cartesianProduct(filterPrefixes)) { + buffer.clear(); + map.clear(); + stringBuilder.setLength(0); + for (int i = 0; i < partitionKeyColumns.size(); i++) { + Object value = values.get(i); + CassandraColumnHandle columnHandle = partitionKeyColumns.get(i); + CassandraType cassandraType = columnHandle.getCassandraType(); + + switch (cassandraType.getKind()) { + case TEXT: + Slice slice = (Slice) value; + if (isComposite) { + buffer.putShort((short) slice.length()); + buffer.put(slice.getBytes()); + buffer.put((byte) 0); + } + else { + buffer.put(slice.getBytes()); + } + break; + case INT: + int intValue = toIntExact((long) value); + if (isComposite) { + buffer.putShort((short) Integer.BYTES); + buffer.putInt(intValue); + buffer.put((byte) 0); + } + else { + buffer.putInt(intValue); + } + break; + case BIGINT: + if (isComposite) { + buffer.putShort((short) Long.BYTES); + buffer.putLong((long) value); + buffer.put((byte) 0); + } + else { + buffer.putLong((long) value); + } + break; + default: + throw new IllegalStateException("Handling of type " + cassandraType + " is not implemented"); + } + + map.put(columnHandle, NullableValue.of(cassandraType.getTrinoType(), value)); + if (i > 0) { + stringBuilder.append(" AND "); + } + stringBuilder.append(CassandraCqlUtils.validColumnName(columnHandle.getName())); + stringBuilder.append(" = "); + stringBuilder.append(CassandraType.getColumnValueForCql(value, cassandraType)); + } + buffer.flip(); + byte[] key = new byte[buffer.limit()]; + buffer.get(key); + TupleDomain tupleDomain = TupleDomain.fromFixedValues(map); + String partitionId = stringBuilder.toString(); + if (uniquePartitionIds.add(partitionId)) { + partitions.add(new CassandraPartition(key, partitionId, tupleDomain, false)); + } + } + return partitions.build(); + } + public ResultSet execute(String cql) { log.debug("Execute cql: %s", cql); diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraType.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraType.java index 848725aaf196..f12236d3857d 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraType.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraType.java @@ -14,10 +14,12 @@ package io.trino.plugin.cassandra; import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; import io.trino.spi.type.Type; import java.util.List; +import static io.trino.plugin.cassandra.util.CassandraCqlUtils.quoteStringLiteral; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -78,6 +80,22 @@ public static CassandraType primitiveType(Kind kind, Type trinoType) return new CassandraType(kind, trinoType, ImmutableList.of()); } + public static String getColumnValueForCql(Object object, CassandraType cassandraType) + { + switch (cassandraType.getKind()) { + case ASCII: + case TEXT: + case VARCHAR: + return quoteStringLiteral(((Slice) object).toStringUtf8()); + case INT: + case BIGINT: + return object.toString(); + default: + throw new IllegalStateException("Handling of type " + cassandraType.kind.name() + + " is not implemented"); + } + } + @Override public String toString() { diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java index 8ac5e08c82a9..daee151619ed 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java @@ -103,7 +103,19 @@ public CassandraServer(DockerImageName imageName, Map environmen CASSANDRA_TYPE_MANAGER, JsonCodec.listJsonCodec(ExtraColumnMetadata.class), cqlSessionBuilder::build, - new Duration(1, MINUTES)); + new Duration(1, MINUTES), + false); + + try { + checkConnectivity(session); + } + catch (RuntimeException e) { + session.close(); + this.dockerContainer.stop(); + throw e; + } + + this.session = session; } private static String prepareCassandraYaml(String fileName) diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraClientConfig.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraClientConfig.java index ac6619239d49..d7924c927da5 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraClientConfig.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraClientConfig.java @@ -65,7 +65,8 @@ public void testDefaults() .setKeystorePath(null) .setKeystorePassword(null) .setTruststorePath(null) - .setTruststorePassword(null)); + .setTruststorePassword(null) + .setSkipPartitionCheck(false)); } @Test @@ -104,6 +105,7 @@ public void testExplicitPropertyMappings() .put("cassandra.tls.keystore-password", "keystore-password") .put("cassandra.tls.truststore-path", truststoreFile.toString()) .put("cassandra.tls.truststore-password", "truststore-password") + .put("cassandra.skip-partition-check", "true") .buildOrThrow(); CassandraClientConfig expected = new CassandraClientConfig() @@ -134,7 +136,8 @@ public void testExplicitPropertyMappings() .setKeystorePath(keystoreFile.toFile()) .setKeystorePassword("keystore-password") .setTruststorePath(truststoreFile.toFile()) - .setTruststorePassword("truststore-password"); + .setTruststorePassword("truststore-password") + .setSkipPartitionCheck(true); assertFullMapping(properties, expected); } diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSession.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSession.java new file mode 100644 index 000000000000..b8049fcc0258 --- /dev/null +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSession.java @@ -0,0 +1,185 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.cassandra; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.datastax.oss.driver.api.core.ProtocolVersion; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; +import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.airlift.json.JsonCodec; +import io.airlift.slice.Slices; +import io.airlift.units.Duration; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Set; + +import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT; +import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.PROTOCOL_VERSION; +import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.REQUEST_TIMEOUT; +import static io.trino.plugin.cassandra.CassandraTestingUtils.CASSANDRA_TYPE_MANAGER; +import static io.trino.plugin.cassandra.CassandraTestingUtils.createKeyspace; +import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.testng.Assert.assertEquals; + +@Test(singleThreaded = true) +public class TestCassandraSession +{ + private static final String KEYSPACE = "test_native_cassandra_session_keyspace"; + private static final int FILTER_PARTITION_COUNT = 5; + private static final int EXISTING_PARTITION_COUNT = 4; + private static final int CLUSTERING_KEY_COUNT = 3; + + private CassandraServer server; + private CassandraSession session; + + @BeforeClass + public void setUp() + throws Exception + { + this.server = new CassandraServer(); + session = server.getSession(); + createKeyspace(session, KEYSPACE); + } + + @Test + public void testGetPartitionsFromSingleParitionKeyTable() + { + CassandraSession nativeSession = buildNativeSession(false); + String tableName = "single_part_key_table"; + CassandraTable table = createSinglePartitionKeyTable(tableName); + + ImmutableList> partitionKeysList = buildSinglePartitionKeysList(); + List partitions = nativeSession.getPartitions(table, partitionKeysList); + + assertEquals(partitions.size(), EXISTING_PARTITION_COUNT); + session.execute(format("DROP TABLE %s.%s", KEYSPACE, tableName)); + } + + @Test + public void testGetPartitionsFromSinglePartitionKeyTableWithSkipPartitionCheck() + { + CassandraSession nativeSession = buildNativeSession(true); + String tableName = "single_part_key_with_skip_partition_check_table"; + CassandraTable table = createSinglePartitionKeyTable(tableName); + + ImmutableList> partitionKeysList = buildSinglePartitionKeysList(); + List partitions = nativeSession.getPartitions(table, partitionKeysList); + + assertEquals(partitions.size(), FILTER_PARTITION_COUNT); + session.execute(format("DROP TABLE %s.%s", KEYSPACE, tableName)); + } + + @Test + public void testGetPartitionsFromMultipleParitionKeyTable() + { + CassandraSession nativeSession = buildNativeSession(false); + String tableName = "multi_part_key_table"; + CassandraTable table = createMultiplePartitionKeyTable(tableName); + + ImmutableList> partitionKeysList = buildMultiplePartitionKeysList(); + List partitions = nativeSession.getPartitions(table, partitionKeysList); + + assertEquals(partitions.size(), EXISTING_PARTITION_COUNT); + session.execute(format("DROP TABLE %s.%s", KEYSPACE, tableName)); + } + + @Test + public void testGetPartitionsFromMultiplePartitionKeyTableWithSkipPartitionCheck() + { + CassandraSession nativeSession = buildNativeSession(true); + String tableName = "multi_part_key_with_skip_partition_check_table"; + CassandraTable table = createMultiplePartitionKeyTable(tableName); + + ImmutableList> partitionKeysList = buildMultiplePartitionKeysList(); + List partitions = nativeSession.getPartitions(table, partitionKeysList); + + assertEquals(partitions.size(), FILTER_PARTITION_COUNT * FILTER_PARTITION_COUNT); + session.execute(format("DROP TABLE %s.%s", KEYSPACE, tableName)); + } + + private CassandraSession buildNativeSession(boolean skipPartitionCheck) + { + ProgrammaticDriverConfigLoaderBuilder driverConfigLoaderBuilder = DriverConfigLoader.programmaticBuilder(); + driverConfigLoaderBuilder.withDuration(REQUEST_TIMEOUT, java.time.Duration.ofSeconds(12)); + driverConfigLoaderBuilder.withString(PROTOCOL_VERSION, ProtocolVersion.V3.name()); + driverConfigLoaderBuilder.withDuration(CONTROL_CONNECTION_AGREEMENT_TIMEOUT, java.time.Duration.ofSeconds(30)); + CqlSessionBuilder cqlSessionBuilder = CqlSession.builder() + .withApplicationName("TestCluster") + .addContactPoint(new InetSocketAddress(server.getHost(), server.getPort())) + .withLocalDatacenter("datacenter1") + .withConfigLoader(driverConfigLoaderBuilder.build()); + return new CassandraSession( + CASSANDRA_TYPE_MANAGER, + JsonCodec.listJsonCodec(ExtraColumnMetadata.class), + cqlSessionBuilder::build, + new Duration(1, MINUTES), + skipPartitionCheck); + } + + private ImmutableList> buildSinglePartitionKeysList() + { + ImmutableSet.Builder partitionColumnValues = ImmutableSet.builder(); + for (int i = 0; i < FILTER_PARTITION_COUNT; i++) { + partitionColumnValues.add((long) i); + } + return ImmutableList.of(partitionColumnValues.build()); + } + + private CassandraTable createSinglePartitionKeyTable(String tableName) + { + session.execute(format("CREATE TABLE %s.%s (partition_key1 bigint, clustering_key1 bigint, PRIMARY KEY (partition_key1, clustering_key1))", KEYSPACE, tableName)); + for (int i = 0; i < EXISTING_PARTITION_COUNT; i++) { + for (int j = 0; j < CLUSTERING_KEY_COUNT; j++) { + session.execute(format("INSERT INTO %s.%s (partition_key1, clustering_key1) VALUES (%d, %d)", KEYSPACE, tableName, i, j)); + } + } + + CassandraColumnHandle col1 = new CassandraColumnHandle("partition_key1", 1, CassandraTypes.BIGINT, true, false, false, false); + CassandraColumnHandle col2 = new CassandraColumnHandle("clustering_key1", 2, CassandraTypes.BIGINT, false, true, false, false); + return new CassandraTable(new CassandraNamedRelationHandle(KEYSPACE, tableName), ImmutableList.of(col1, col2)); + } + + private ImmutableList> buildMultiplePartitionKeysList() + { + ImmutableSet.Builder col1Values = ImmutableSet.builder(); + ImmutableSet.Builder col2Values = ImmutableSet.builder(); + for (int i = 0; i < FILTER_PARTITION_COUNT; i++) { + col1Values.add((long) i); + col2Values.add(Slices.utf8Slice(Integer.toString(i))); + } + return ImmutableList.of(col1Values.build(), col2Values.build()); + } + + private CassandraTable createMultiplePartitionKeyTable(String tableName) + { + session.execute(format("CREATE TABLE %s.%s (partition_key1 bigint, partition_key2 text, clustering_key1 bigint, PRIMARY KEY ((partition_key1, partition_key2), clustering_key1))", KEYSPACE, tableName)); + for (int i = 0; i < EXISTING_PARTITION_COUNT; i++) { + for (int j = 0; j < CLUSTERING_KEY_COUNT; j++) { + session.execute(format("INSERT INTO %s.%s (partition_key1, partition_key2, clustering_key1) VALUES (%d, '%s', %d)", KEYSPACE, tableName, i, Integer.toString(i), j)); + } + } + + CassandraColumnHandle col1 = new CassandraColumnHandle("partition_key1", 1, CassandraTypes.BIGINT, true, false, false, false); + CassandraColumnHandle col2 = new CassandraColumnHandle("partition_key2", 2, CassandraTypes.TEXT, true, false, false, false); + CassandraColumnHandle col3 = new CassandraColumnHandle("clustering_key1", 3, CassandraTypes.BIGINT, false, true, false, false); + return new CassandraTable(new CassandraNamedRelationHandle(KEYSPACE, tableName), ImmutableList.of(col1, col2, col3)); + } +} diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestingScyllaServer.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestingScyllaServer.java index 4fc1c4c7456a..6a9d538a638c 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestingScyllaServer.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestingScyllaServer.java @@ -78,7 +78,8 @@ public TestingScyllaServer(String version) CASSANDRA_TYPE_MANAGER, JsonCodec.listJsonCodec(ExtraColumnMetadata.class), cqlSessionBuilder::build, - new Duration(1, MINUTES)); + new Duration(1, MINUTES), + false); } public CassandraSession getSession() From 49554a7cb7d390a904b54ada8647bb738911fc79 Mon Sep 17 00:00:00 2001 From: abicky Date: Wed, 31 Oct 2018 21:40:39 +0900 Subject: [PATCH 03/12] Support ASCII and VARCHAR --- .../main/java/io/trino/plugin/cassandra/CassandraSession.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java index 0914ad159a0b..fcbafeb12e53 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java @@ -507,7 +507,9 @@ private List buildPartitionsFromFilterPrefixes(CassandraTabl CassandraType cassandraType = columnHandle.getCassandraType(); switch (cassandraType.getKind()) { + case ASCII: case TEXT: + case VARCHAR: Slice slice = (Slice) value; if (isComposite) { buffer.putShort((short) slice.length()); From ee8ac6d415fca92b183a8a078ebecf7a55835f83 Mon Sep 17 00:00:00 2001 From: abicky Date: Mon, 16 Nov 2020 21:12:05 +0900 Subject: [PATCH 04/12] Run CI on branches repro-* --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f92c551677d3..d49d4f3be481 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,6 +4,7 @@ on: push: branches: - master + - 'repro-*' pull_request: paths-ignore: - 'docs/**' From 9ddee032bcc08e71b92b01c54d5fb278d11a5f6d Mon Sep 17 00:00:00 2001 From: Kenji Okimoto Date: Tue, 6 Feb 2024 13:37:03 +0900 Subject: [PATCH 05/12] Add GitHub Actions workflow to build repro-cassandra jar --- .github/workflows/trino-cassandra.yml | 61 +++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 .github/workflows/trino-cassandra.yml diff --git a/.github/workflows/trino-cassandra.yml b/.github/workflows/trino-cassandra.yml new file mode 100644 index 000000000000..9f7f904a6075 --- /dev/null +++ b/.github/workflows/trino-cassandra.yml @@ -0,0 +1,61 @@ +name: Build trino-cassandra + +on: + push: + branches: + - 'repro-*' + +env: + # An envar that signals to tests we are executing in the CI environment + CONTINUOUS_INTEGRATION: true + MAVEN_OPTS: "-Xmx1024M -XX:+ExitOnOutOfMemoryError" + MAVEN_INSTALL_OPTS: "-Xmx2G -XX:+ExitOnOutOfMemoryError" + MAVEN_FAST_INSTALL: "-B -V --quiet -T 1C -DskipTests -Dair.check.skip-all -Dmaven.javadoc.skip=true" + MAVEN_TEST: "-B -Dair.check.skip-all -Dmaven.javadoc.skip=true -DLogTestDurationListener.enabled=true --fail-at-end" + RETRY: .github/bin/retry + +jobs: + trino-cassandra: + runs-on: ubuntu-latest + permissions: + id-token: write + contents: read + steps: + - uses: aws-actions/configure-aws-credentials@v4 + with: + aws-region: ap-northeast-1 + role-to-assume: ${{ vars.ROLE_TO_ASSUME }} + - uses: actions/checkout@v3 + - uses: actions/setup-java@v3 + with: + distribution: corretto + java-version: 11 + - name: Cache local Maven repository + id: cache-maven + uses: actions/cache@v2 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-2-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-maven-2- + - name: Populate maven cache + if: steps.cache-maven.outputs.cache-hit != 'true' + run: ./mvnw de.qaware.maven:go-offline-maven-plugin:resolve-dependencies + - name: Maven Install + run: | + export MAVEN_OPTS="${MAVEN_INSTALL_OPTS}" + ./mvnw install ${MAVEN_FAST_INSTALL} -am -pl plugin/trino-cassandra + - name: Upload trino-cassandra.jar to S3 + run: | + trino_cassandra_jar=trino-cassandra-$(yq .project.version pom.xml).jar + aws s3api put-object \ + --bucket ${{ vars.DEPLOY_S3_BUCKET }} \ + --key "${{ vars.DEPLOY_S3_KEY_PREFIX }}/${{ github.ref_name }}/${{ github.run_number }}/${trino_cassandra_jar}" \ + --body "./plugin/trino-cassandra/target/${trino_cassandra_jar}" + - name: Upload trino-cassandra-services.jar + run: | + trino_cassandra_services_jar=trino-cassandra-$(yq .project.version pom.xml)-services.jar + aws s3api put-object \ + --bucket ${{ vars.DEPLOY_S3_BUCKET }} \ + --key "${{ vars.DEPLOY_S3_KEY_PREFIX }}/${{ github.ref_name }}/${{ github.run_number }}/${trino_cassandra_services_jar}" \ + --body "./plugin/trino-cassandra/target/${trino_cassandra_services_jar}" From 69a440436d46507e266b01d053b1011305df30e8 Mon Sep 17 00:00:00 2001 From: Kenji Okimoto Date: Tue, 6 Feb 2024 17:11:03 +0900 Subject: [PATCH 06/12] Update to Java 17 --- .github/workflows/trino-cassandra.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/trino-cassandra.yml b/.github/workflows/trino-cassandra.yml index 9f7f904a6075..da532dc7e973 100644 --- a/.github/workflows/trino-cassandra.yml +++ b/.github/workflows/trino-cassandra.yml @@ -29,7 +29,7 @@ jobs: - uses: actions/setup-java@v3 with: distribution: corretto - java-version: 11 + java-version: 17 - name: Cache local Maven repository id: cache-maven uses: actions/cache@v2 From f6156fca2e215bd4d286a1db39105e0dfd5c30d2 Mon Sep 17 00:00:00 2001 From: Toshihito Kon Date: Thu, 4 Sep 2025 14:06:12 +0900 Subject: [PATCH 07/12] update actions version --- .github/workflows/trino-cassandra.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/trino-cassandra.yml b/.github/workflows/trino-cassandra.yml index da532dc7e973..409d1e5f018b 100644 --- a/.github/workflows/trino-cassandra.yml +++ b/.github/workflows/trino-cassandra.yml @@ -21,18 +21,18 @@ jobs: id-token: write contents: read steps: - - uses: aws-actions/configure-aws-credentials@v4 + - uses: aws-actions/configure-aws-credentials@v5 with: aws-region: ap-northeast-1 role-to-assume: ${{ vars.ROLE_TO_ASSUME }} - - uses: actions/checkout@v3 - - uses: actions/setup-java@v3 + - uses: actions/checkout@v5 + - uses: actions/setup-java@v5 with: distribution: corretto java-version: 17 - name: Cache local Maven repository id: cache-maven - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: ~/.m2/repository key: ${{ runner.os }}-maven-2-${{ hashFiles('**/pom.xml') }} From f82569015235576833413f3c4baa1c603f02ce9a Mon Sep 17 00:00:00 2001 From: Toshihito Kon Date: Thu, 4 Sep 2025 14:15:56 +0900 Subject: [PATCH 08/12] update java version 17 to 21 --- .github/workflows/trino-cassandra.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/trino-cassandra.yml b/.github/workflows/trino-cassandra.yml index 409d1e5f018b..6fddb16d1edb 100644 --- a/.github/workflows/trino-cassandra.yml +++ b/.github/workflows/trino-cassandra.yml @@ -29,7 +29,7 @@ jobs: - uses: actions/setup-java@v5 with: distribution: corretto - java-version: 17 + java-version: 21 - name: Cache local Maven repository id: cache-maven uses: actions/cache@v4 From a9dfdec0949d8450a5e67703accdb1933afdcd3b Mon Sep 17 00:00:00 2001 From: Toshihito Kon Date: Thu, 4 Sep 2025 18:01:38 +0900 Subject: [PATCH 09/12] fix: migrate TestCassandraSession from TestNG to JUnit Jupiter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Convert TestNG annotations (@Test, @BeforeClass) to JUnit Jupiter (@Test, @BeforeAll) - Replace TestNG assertEquals with JUnit Jupiter Assertions.assertEquals - Update static method requirements for @BeforeAll - Remove deprecated checkConnectivity method call from CassandraServer - Remove duplicate session assignment 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../trino/plugin/cassandra/CassandraServer.java | 11 ----------- .../plugin/cassandra/TestCassandraSession.java | 17 ++++++++--------- 2 files changed, 8 insertions(+), 20 deletions(-) diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java index daee151619ed..40ddd64bdfa2 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraServer.java @@ -105,17 +105,6 @@ public CassandraServer(DockerImageName imageName, Map environmen cqlSessionBuilder::build, new Duration(1, MINUTES), false); - - try { - checkConnectivity(session); - } - catch (RuntimeException e) { - session.close(); - this.dockerContainer.stop(); - throw e; - } - - this.session = session; } private static String prepareCassandraYaml(String fileName) diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSession.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSession.java index b8049fcc0258..729110b5d0f0 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSession.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSession.java @@ -23,8 +23,8 @@ import io.airlift.json.JsonCodec; import io.airlift.slice.Slices; import io.airlift.units.Duration; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import java.net.InetSocketAddress; import java.util.List; @@ -37,9 +37,8 @@ import static io.trino.plugin.cassandra.CassandraTestingUtils.createKeyspace; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.MINUTES; -import static org.testng.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; -@Test(singleThreaded = true) public class TestCassandraSession { private static final String KEYSPACE = "test_native_cassandra_session_keyspace"; @@ -47,14 +46,14 @@ public class TestCassandraSession private static final int EXISTING_PARTITION_COUNT = 4; private static final int CLUSTERING_KEY_COUNT = 3; - private CassandraServer server; - private CassandraSession session; + private static CassandraServer server; + private static CassandraSession session; - @BeforeClass - public void setUp() + @BeforeAll + public static void setUp() throws Exception { - this.server = new CassandraServer(); + server = new CassandraServer(); session = server.getSession(); createKeyspace(session, KEYSPACE); } From ac88706178f2c19acbc7e623e77a21d0bfeb6b2f Mon Sep 17 00:00:00 2001 From: Toshihito Kon Date: Fri, 12 Sep 2025 18:26:53 +0900 Subject: [PATCH 10/12] update java version --- .github/workflows/trino-cassandra.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/trino-cassandra.yml b/.github/workflows/trino-cassandra.yml index 6fddb16d1edb..51cb594d07ef 100644 --- a/.github/workflows/trino-cassandra.yml +++ b/.github/workflows/trino-cassandra.yml @@ -3,7 +3,7 @@ name: Build trino-cassandra on: push: branches: - - 'repro-*' + - "repro-*" env: # An envar that signals to tests we are executing in the CI environment @@ -29,7 +29,7 @@ jobs: - uses: actions/setup-java@v5 with: distribution: corretto - java-version: 21 + java-version: 22 - name: Cache local Maven repository id: cache-maven uses: actions/cache@v4 From 5c7d868a61003001bc9c204b12cc396aa9e2000f Mon Sep 17 00:00:00 2001 From: Toshihito Kon Date: Thu, 18 Sep 2025 12:44:13 +0900 Subject: [PATCH 11/12] replace import and fix functions --- .../io/trino/plugin/cassandra/CassandraSession.java | 12 +++++------- .../io/trino/plugin/cassandra/CassandraType.java | 2 +- .../trino/plugin/cassandra/TestCassandraSession.java | 10 +++++----- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java index fcbafeb12e53..7d4dae2fbd69 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java @@ -57,8 +57,6 @@ import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; -import io.trino.spi.predicate.NullableValue; -import io.trino.spi.predicate.TupleDomain; import java.io.Closeable; import java.nio.ByteBuffer; @@ -483,7 +481,7 @@ public List getPartitions(CassandraTable table, List buildPartitionsFromFilterPrefixes(CassandraTable table, List> filterPrefixes) { - List partitionKeyColumns = table.getPartitionKeyColumns(); + List partitionKeyColumns = table.partitionKeyColumns(); if (filterPrefixes.size() != partitionKeyColumns.size()) { return ImmutableList.of(CassandraPartition.UNPARTITIONED); @@ -504,9 +502,9 @@ private List buildPartitionsFromFilterPrefixes(CassandraTabl for (int i = 0; i < partitionKeyColumns.size(); i++) { Object value = values.get(i); CassandraColumnHandle columnHandle = partitionKeyColumns.get(i); - CassandraType cassandraType = columnHandle.getCassandraType(); + CassandraType cassandraType = columnHandle.cassandraType(); - switch (cassandraType.getKind()) { + switch (cassandraType.kind()) { case ASCII: case TEXT: case VARCHAR: @@ -545,11 +543,11 @@ private List buildPartitionsFromFilterPrefixes(CassandraTabl throw new IllegalStateException("Handling of type " + cassandraType + " is not implemented"); } - map.put(columnHandle, NullableValue.of(cassandraType.getTrinoType(), value)); + map.put(columnHandle, NullableValue.of(cassandraType.trinoType(), value)); if (i > 0) { stringBuilder.append(" AND "); } - stringBuilder.append(CassandraCqlUtils.validColumnName(columnHandle.getName())); + stringBuilder.append(CassandraCqlUtils.validColumnName(columnHandle.name())); stringBuilder.append(" = "); stringBuilder.append(CassandraType.getColumnValueForCql(value, cassandraType)); } diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraType.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraType.java index f12236d3857d..fbd8f284c045 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraType.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraType.java @@ -82,7 +82,7 @@ public static CassandraType primitiveType(Kind kind, Type trinoType) public static String getColumnValueForCql(Object object, CassandraType cassandraType) { - switch (cassandraType.getKind()) { + switch (cassandraType.kind()) { case ASCII: case TEXT: case VARCHAR: diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSession.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSession.java index 729110b5d0f0..2bc03f71b197 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSession.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraSession.java @@ -37,7 +37,7 @@ import static io.trino.plugin.cassandra.CassandraTestingUtils.createKeyspace; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.MINUTES; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; public class TestCassandraSession { @@ -68,7 +68,7 @@ public void testGetPartitionsFromSingleParitionKeyTable() ImmutableList> partitionKeysList = buildSinglePartitionKeysList(); List partitions = nativeSession.getPartitions(table, partitionKeysList); - assertEquals(partitions.size(), EXISTING_PARTITION_COUNT); + assertThat(partitions.size()).isEqualTo(EXISTING_PARTITION_COUNT); session.execute(format("DROP TABLE %s.%s", KEYSPACE, tableName)); } @@ -82,7 +82,7 @@ public void testGetPartitionsFromSinglePartitionKeyTableWithSkipPartitionCheck() ImmutableList> partitionKeysList = buildSinglePartitionKeysList(); List partitions = nativeSession.getPartitions(table, partitionKeysList); - assertEquals(partitions.size(), FILTER_PARTITION_COUNT); + assertThat(partitions.size()).isEqualTo(FILTER_PARTITION_COUNT); session.execute(format("DROP TABLE %s.%s", KEYSPACE, tableName)); } @@ -96,7 +96,7 @@ public void testGetPartitionsFromMultipleParitionKeyTable() ImmutableList> partitionKeysList = buildMultiplePartitionKeysList(); List partitions = nativeSession.getPartitions(table, partitionKeysList); - assertEquals(partitions.size(), EXISTING_PARTITION_COUNT); + assertThat(partitions.size()).isEqualTo(EXISTING_PARTITION_COUNT); session.execute(format("DROP TABLE %s.%s", KEYSPACE, tableName)); } @@ -110,7 +110,7 @@ public void testGetPartitionsFromMultiplePartitionKeyTableWithSkipPartitionCheck ImmutableList> partitionKeysList = buildMultiplePartitionKeysList(); List partitions = nativeSession.getPartitions(table, partitionKeysList); - assertEquals(partitions.size(), FILTER_PARTITION_COUNT * FILTER_PARTITION_COUNT); + assertThat(partitions.size()).isEqualTo(FILTER_PARTITION_COUNT * FILTER_PARTITION_COUNT); session.execute(format("DROP TABLE %s.%s", KEYSPACE, tableName)); } From efd05dd79ca682024f973c33cc52adde27105ac1 Mon Sep 17 00:00:00 2001 From: Toshihito Kon Date: Thu, 18 Sep 2025 16:34:17 +0900 Subject: [PATCH 12/12] add imports --- .../main/java/io/trino/plugin/cassandra/CassandraSession.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java index 7d4dae2fbd69..11eda2a59ce4 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSession.java @@ -57,6 +57,8 @@ import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.predicate.NullableValue; +import io.trino.spi.predicate.TupleDomain; import java.io.Closeable; import java.nio.ByteBuffer;