From 1e2e43af66b35cdf8bc5f829f8edfe47ae8a672a Mon Sep 17 00:00:00 2001 From: chenjian2664 Date: Wed, 24 Sep 2025 13:56:40 +0800 Subject: [PATCH] Replace `VendedCredentials` with `FileSystemCredentials` Previously, credentials were represented as a `Map` in `VendedCredentials`. The new `FileSystemCredentials` interface allows greater flexibility by letting implementations decide how credentials are structured and represented. --- .../CorruptedDeltaLakeTableHandle.java | 4 +- .../DefaultDeltaLakeFileSystemFactory.java | 28 +++++++++++-- .../plugin/deltalake/DeltaLakeMetadata.java | 42 +++++++------------ .../deltalake/DeltaLakeMetadataFactory.java | 9 +--- .../deltalake/DeltaLakeOutputTableHandle.java | 9 ++-- .../deltalake/DeltaLakePageSinkProvider.java | 2 +- .../deltalake/DeltaLakeTableHandle.java | 33 ++++----------- .../metastore/DeltaMetastoreTable.java | 6 +-- ...ntials.java => FileSystemCredentials.java} | 26 ++++-------- ...HiveMetastoreBackedDeltaLakeMetastore.java | 3 +- .../metastore/VendedCredentialsHandle.java | 8 ++-- .../deltalake/TestDeltaLakeMetadata.java | 3 +- ...DeltaLakeNodeLocalDynamicSplitPruning.java | 9 ++-- .../deltalake/TestDeltaLakePageSink.java | 7 ++-- .../deltalake/TestDeltaLakeSplitManager.java | 18 ++++---- .../deltalake/TestTransactionLogAccess.java | 6 +-- .../deltalake/TestingDeltaLakePlugin.java | 3 +- .../deltalake/TestingDeltaLakeUtils.java | 6 +-- ...aLakeFileBasedTableStatisticsProvider.java | 19 ++++----- .../transactionlog/TestTableSnapshot.java | 3 +- .../checkpoint/TestTransactionLogTail.java | 5 ++- 21 files changed, 108 insertions(+), 141 deletions(-) rename plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/{VendedCredentials.java => FileSystemCredentials.java} (51%) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/CorruptedDeltaLakeTableHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/CorruptedDeltaLakeTableHandle.java index 08ff051a5df9..6c9dc6839d8e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/CorruptedDeltaLakeTableHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/CorruptedDeltaLakeTableHandle.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.deltalake; -import io.trino.plugin.deltalake.metastore.VendedCredentials; import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle; import io.trino.spi.TrinoException; import io.trino.spi.connector.SchemaTableName; @@ -27,7 +26,6 @@ public record CorruptedDeltaLakeTableHandle( boolean catalogOwned, boolean managed, String location, - Optional vendedCredentials, TrinoException originalException) implements LocatedTableHandle { @@ -47,6 +45,6 @@ public TrinoException createException() @Override public VendedCredentialsHandle toCredentialsHandle() { - return new VendedCredentialsHandle(catalogOwned, managed, location, vendedCredentials.orElse(VendedCredentials.empty())); + return new VendedCredentialsHandle(catalogOwned, managed, location, Optional.empty()); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DefaultDeltaLakeFileSystemFactory.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DefaultDeltaLakeFileSystemFactory.java index ac3749d2a1b9..a2f68667460a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DefaultDeltaLakeFileSystemFactory.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DefaultDeltaLakeFileSystemFactory.java @@ -13,11 +13,17 @@ */ package io.trino.plugin.deltalake; +import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.deltalake.metastore.FileSystemCredentials; import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle; +import io.trino.plugin.deltalake.metastore.VendedCredentialsProvider; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.security.ConnectorIdentity; + +import java.util.Optional; import static java.util.Objects.requireNonNull; @@ -25,22 +31,36 @@ public class DefaultDeltaLakeFileSystemFactory implements DeltaLakeFileSystemFactory { private final TrinoFileSystemFactory fileSystemFactory; + private final VendedCredentialsProvider vendedCredentialsProvider; @Inject - public DefaultDeltaLakeFileSystemFactory(TrinoFileSystemFactory fileSystemFactory) + public DefaultDeltaLakeFileSystemFactory(TrinoFileSystemFactory fileSystemFactory, VendedCredentialsProvider vendedCredentialsProvider) { this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); + this.vendedCredentialsProvider = requireNonNull(vendedCredentialsProvider, "vendedCredentialsProvider is null"); } @Override - public TrinoFileSystem create(ConnectorSession session, VendedCredentialsHandle table) + public TrinoFileSystem create(ConnectorSession session, VendedCredentialsHandle vendedCredentialsHandle) { - return fileSystemFactory.create(session.getIdentity()); + requireNonNull(vendedCredentialsHandle, "vendedCredentialsHandle is null"); + + Optional freshCredentials = vendedCredentialsProvider.getFreshCredentials(vendedCredentialsHandle).vendedCredentials(); + + ConnectorIdentity identity = session.getIdentity(); + ConnectorIdentity identityWithExtraCredentials = ConnectorIdentity.forUser(identity.getUser()) + .withGroups(identity.getGroups()) + .withPrincipal(identity.getPrincipal()) + .withEnabledSystemRoles(identity.getEnabledSystemRoles()) + .withConnectorRole(identity.getConnectorRole()) + .withExtraCredentials(freshCredentials.map(FileSystemCredentials::asExtraCredentials).orElse(ImmutableMap.of())) + .build(); + return fileSystemFactory.create(identityWithExtraCredentials); } @Override public TrinoFileSystem create(ConnectorSession session, String tableLocation) { - return fileSystemFactory.create(session.getIdentity()); + return create(session, VendedCredentialsHandle.empty(tableLocation)); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 1f336bd12576..a65a547beead 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -53,7 +53,6 @@ import io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore; import io.trino.plugin.deltalake.metastore.NotADeltaLakeTableException; import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle; -import io.trino.plugin.deltalake.metastore.VendedCredentialsProvider; import io.trino.plugin.deltalake.procedure.DeltaLakeTableExecuteHandle; import io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId; import io.trino.plugin.deltalake.procedure.DeltaTableOptimizeHandle; @@ -472,7 +471,6 @@ public class DeltaLakeMetadata private final Map queriedSnapshots = new ConcurrentHashMap<>(); private final Executor metadataFetchingExecutor; private final TransactionLogReaderFactory transactionLogReaderFactory; - private final VendedCredentialsProvider vendedCredentialsProvider; private record QueriedTable(SchemaTableName schemaTableName, long version) { @@ -503,8 +501,7 @@ public DeltaLakeMetadata( boolean useUniqueTableLocation, boolean allowManagedTableRename, Executor metadataFetchingExecutor, - TransactionLogReaderFactory transactionLogReaderFactory, - VendedCredentialsProvider vendedCredentialsProvider) + TransactionLogReaderFactory transactionLogReaderFactory) { this.metastore = requireNonNull(metastore, "metastore is null"); this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null"); @@ -528,7 +525,6 @@ public DeltaLakeMetadata( this.allowManagedTableRename = allowManagedTableRename; this.metadataFetchingExecutor = requireNonNull(metadataFetchingExecutor, "metadataFetchingExecutor is null"); this.transactionLogReaderFactory = requireNonNull(transactionLogReaderFactory, "transactionLogLoaderFactory"); - this.vendedCredentialsProvider = requireNonNull(vendedCredentialsProvider, "vendedCredentialsProvider is null"); } private TableSnapshot getSnapshot(ConnectorSession session, DeltaLakeTableHandle table) @@ -729,18 +725,18 @@ public LocatedTableHandle getTableHandle( } catch (TrinoException e) { if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) { - return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, table.vendedCredentials(), e); + return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, e); } throw e; } MetadataEntry metadataEntry = logEntries.metadata().orElse(null); if (metadataEntry == null) { - return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, table.vendedCredentials(), new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable())); + return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable())); } ProtocolEntry protocolEntry = logEntries.protocol().orElse(null); if (protocolEntry == null) { - return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, table.vendedCredentials(), new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + tableSnapshot.getTable())); + return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + tableSnapshot.getTable())); } if (protocolEntry.minReaderVersion() > MAX_READER_VERSION) { LOG.debug("Skip %s because the reader version is unsupported: %d", tableName, protocolEntry.minReaderVersion()); @@ -772,8 +768,7 @@ public LocatedTableHandle getTableHandle( Optional.empty(), Optional.empty(), tableSnapshot.getVersion(), - endVersion.isPresent(), - table.vendedCredentials()); + endVersion.isPresent()); } @Override @@ -1345,7 +1340,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe "Using CREATE [OR REPLACE] TABLE with an existing table content is disallowed, instead use the system.register_table() procedure."); } else { - TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location, fetchCredentialsForLocation(location)); + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, location, VendedCredentialsHandle.empty(location)); ProtocolEntry protocolEntry; if (replaceExistingTable) { commitVersion = getMandatoryCurrentVersion(fileSystem, location, tableHandle.getReadVersion()) + 1; @@ -1403,12 +1398,6 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe } } - private VendedCredentialsHandle fetchCredentialsForLocation(String tableLocation) - { - // TODO: update once we support creating managed table - return vendedCredentialsProvider.getFreshCredentials(VendedCredentialsHandle.empty(tableLocation)); - } - public Table buildTable(ConnectorSession session, SchemaTableName schemaTableName, String location, boolean isExternal, Optional tableComment, long version, String schemaString) { Table.Builder tableBuilder = Table.builder() @@ -1572,8 +1561,7 @@ public DeltaLakeOutputTableHandle beginCreateTable( maxFieldId, replace, readVersion, - protocolEntry, - fetchCredentialsForLocation(location)); + protocolEntry); } private Optional getSchemaLocation(Database database) @@ -1722,7 +1710,7 @@ public Optional finishCreateTable( if (handle.readVersion().isEmpty()) { // For CTAS there is no risk of multiple writers racing. Using writer without transaction isolation so we are not limiting support for CTAS to // filesystems for which we have proper implementations of TransactionLogSynchronizers. - transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, handle.location(), fetchCredentialsForLocation(handle.location())); + transactionLogWriter = transactionLogWriterFactory.newWriterWithoutTransactionIsolation(session, handle.location(), VendedCredentialsHandle.empty(handle.location())); } else { TrinoFileSystem fileSystem = fileSystemFactory.create(session, location); @@ -1732,7 +1720,7 @@ public Optional finishCreateTable( handle.readVersion().getAsLong(), commitVersion - 1)); } - transactionLogWriter = transactionLogWriterFactory.createFileSystemWriter(session, location, fetchCredentialsForLocation(location)); + transactionLogWriter = transactionLogWriterFactory.createFileSystemWriter(session, location, VendedCredentialsHandle.empty(location)); } appendTableEntries( commitVersion, @@ -1761,7 +1749,7 @@ public Optional finishCreateTable( writeCommitted = true; if (handle.replace() && handle.readVersion().isPresent()) { - writeCheckpointIfNeeded(session, schemaTableName, handle.location(), handle.credentialsHandle(), handle.readVersion().getAsLong(), handle.checkpointInterval(), commitVersion); + writeCheckpointIfNeeded(session, schemaTableName, handle.location(), handle.toCredentialsHandle(), handle.readVersion().getAsLong(), handle.checkpointInterval(), commitVersion); } if (isCollectExtendedStatisticsColumnStatisticsOnWrite(session) && !computedStatistics.isEmpty()) { @@ -1778,7 +1766,7 @@ public Optional finishCreateTable( Optional.empty(), schemaTableName, location, - handle.credentialsHandle(), + handle.toCredentialsHandle(), maxFileModificationTime, computedStatistics, columnNames, @@ -1803,7 +1791,7 @@ public Optional finishCreateTable( // Remove the transaction log entry if the table creation fails if (!writeCommitted) { // TODO perhaps it should happen in a background thread (https://github.com/trinodb/trino/issues/12011) - cleanupFailedWrite(session, handle.credentialsHandle(), dataFileInfos); + cleanupFailedWrite(session, handle.toCredentialsHandle(), dataFileInfos); } if (handle.readVersion().isEmpty()) { Location transactionLogDir = Location.of(getTransactionLogDir(location)); @@ -3524,8 +3512,7 @@ else if (!partitionColumns.contains(column)) { false, Optional.empty(), tableHandle.getReadVersion(), - tableHandle.isTimeTravel(), - tableHandle.getVendedCredentials()); + tableHandle.isTimeTravel()); if (tableHandle.getEnforcedPartitionConstraint().equals(newHandle.getEnforcedPartitionConstraint()) && tableHandle.getNonPartitionConstraint().equals(newHandle.getNonPartitionConstraint()) && @@ -3816,8 +3803,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession Optional.empty(), Optional.of(analyzeHandle), handle.getReadVersion(), - handle.isTimeTravel(), - handle.getVendedCredentials()); + handle.isTimeTravel()); TableStatisticsMetadata statisticsMetadata = getStatisticsCollectionMetadata( columnsMetadata.stream().map(DeltaLakeColumnMetadata::columnMetadata).collect(toImmutableList()), analyzeColumnNames.orElse(allColumnNames), diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java index 63d8d323a952..b6e731e3552b 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java @@ -20,7 +20,6 @@ import io.trino.metastore.cache.CachingHiveMetastore; import io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler; import io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore; -import io.trino.plugin.deltalake.metastore.VendedCredentialsProvider; import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess; import io.trino.plugin.deltalake.statistics.FileBasedTableStatisticsProvider; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; @@ -66,7 +65,6 @@ public class DeltaLakeMetadataFactory private final boolean usingSystemSecurity; private final String trinoVersion; private final TransactionLogReaderFactory transactionLogReaderFactory; - private final VendedCredentialsProvider vendedCredentialsProvider; @Inject public DeltaLakeMetadataFactory( @@ -86,8 +84,7 @@ public DeltaLakeMetadataFactory( NodeVersion nodeVersion, DeltaLakeTableMetadataScheduler metadataScheduler, @ForDeltaLakeMetadata ExecutorService executorService, - TransactionLogReaderFactory transactionLogReaderFactory, - VendedCredentialsProvider vendedCredentialsProvider) + TransactionLogReaderFactory transactionLogReaderFactory) { this.hiveMetastoreFactory = requireNonNull(hiveMetastoreFactory, "hiveMetastore is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); @@ -116,7 +113,6 @@ public DeltaLakeMetadataFactory( this.metadataFetchingExecutor = new BoundedExecutor(executorService, deltaLakeConfig.getMetadataParallelism()); } this.transactionLogReaderFactory = requireNonNull(transactionLogReaderFactory, "transactionLogLoaderFactory is null"); - this.vendedCredentialsProvider = requireNonNull(vendedCredentialsProvider, "vendedCredentialsProvider is null"); } public DeltaLakeMetadata create(ConnectorIdentity identity) @@ -155,7 +151,6 @@ public DeltaLakeMetadata create(ConnectorIdentity identity) useUniqueTableLocation, allowManagedTableRename, metadataFetchingExecutor, - transactionLogReaderFactory, - vendedCredentialsProvider); + transactionLogReaderFactory); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeOutputTableHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeOutputTableHandle.java index f0da66875442..2849c42b1ff2 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeOutputTableHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeOutputTableHandle.java @@ -43,8 +43,7 @@ public record DeltaLakeOutputTableHandle( OptionalInt maxColumnId, boolean replace, OptionalLong readVersion, - ProtocolEntry protocolEntry, - VendedCredentialsHandle credentialsHandle) + ProtocolEntry protocolEntry) implements ConnectorOutputTableHandle { public DeltaLakeOutputTableHandle @@ -61,7 +60,6 @@ public record DeltaLakeOutputTableHandle( requireNonNull(maxColumnId, "maxColumnId is null"); requireNonNull(readVersion, "readVersion is null"); requireNonNull(protocolEntry, "protocolEntry is null"); - requireNonNull(credentialsHandle, "credentialsHandle is null"); } public List partitionedBy() @@ -71,4 +69,9 @@ public List partitionedBy() .map(DeltaLakeColumnHandle::columnName) .collect(toImmutableList()); } + + public VendedCredentialsHandle toCredentialsHandle() + { + return VendedCredentialsHandle.empty(location); + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java index 2769056504bb..36af036c33b7 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSinkProvider.java @@ -119,7 +119,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa maxPartitionsPerWriter, dataFileInfoCodec, Location.of(tableHandle.location()), - tableHandle.credentialsHandle(), + tableHandle.toCredentialsHandle(), session, stats, trinoVersion, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java index 205cfcec48e9..893113de0b84 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java @@ -18,7 +18,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableSet; import io.airlift.units.DataSize; -import io.trino.plugin.deltalake.metastore.VendedCredentials; import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; @@ -72,8 +71,6 @@ public enum WriteType // Used only for validation when config property delta.query-partition-filter-required is enabled. private final Set constraintColumns; - private final Optional vendedCredentials; - @JsonCreator public DeltaLakeTableHandle( @JsonProperty("schemaName") String schemaName, @@ -90,8 +87,7 @@ public DeltaLakeTableHandle( @JsonProperty("updateRowIdColumns") Optional> updateRowIdColumns, @JsonProperty("analyzeHandle") Optional analyzeHandle, @JsonProperty("readVersion") long readVersion, - @JsonProperty("timeTravel") boolean timeTravel, - @JsonProperty("vendedCredentials") Optional vendedCredentials) + @JsonProperty("timeTravel") boolean timeTravel) { this( schemaName, @@ -112,8 +108,7 @@ public DeltaLakeTableHandle( false, Optional.empty(), readVersion, - timeTravel, - vendedCredentials); + timeTravel); } public DeltaLakeTableHandle( @@ -135,8 +130,7 @@ public DeltaLakeTableHandle( boolean isOptimize, Optional maxScannedFileSize, long readVersion, - boolean timeTravel, - Optional vendedCredentials) + boolean timeTravel) { this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); @@ -159,7 +153,6 @@ public DeltaLakeTableHandle( this.readVersion = readVersion; this.timeTravel = timeTravel; this.constraintColumns = ImmutableSet.copyOf(requireNonNull(constraintColumns, "constraintColumns is null")); - this.vendedCredentials = requireNonNull(vendedCredentials, "vendedCredentials is null"); } public DeltaLakeTableHandle withProjectedColumns(Set projectedColumns) @@ -183,8 +176,7 @@ public DeltaLakeTableHandle withProjectedColumns(Set proj isOptimize, maxScannedFileSize, readVersion, - timeTravel, - vendedCredentials); + timeTravel); } public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize maxScannedFileSize) @@ -208,8 +200,7 @@ public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize max true, Optional.of(maxScannedFileSize), readVersion, - timeTravel, - vendedCredentials); + timeTravel); } @Override @@ -256,7 +247,7 @@ public String location() @Override public VendedCredentialsHandle toCredentialsHandle() { - return new VendedCredentialsHandle(false, managed, location, vendedCredentials.orElse(VendedCredentials.empty())); + return new VendedCredentialsHandle(false, managed, location, Optional.empty()); } @JsonProperty @@ -356,12 +347,6 @@ public boolean isTimeTravel() return timeTravel; } - @JsonProperty - public Optional getVendedCredentials() - { - return vendedCredentials; - } - @Override public String toString() { @@ -396,8 +381,7 @@ public boolean equals(Object o) isOptimize == that.isOptimize && Objects.equals(maxScannedFileSize, that.maxScannedFileSize) && readVersion == that.readVersion && - timeTravel == that.timeTravel && - Objects.equals(vendedCredentials, that.vendedCredentials); + timeTravel == that.timeTravel; } @Override @@ -421,7 +405,6 @@ public int hashCode() isOptimize, maxScannedFileSize, readVersion, - timeTravel, - vendedCredentials); + timeTravel); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaMetastoreTable.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaMetastoreTable.java index c8e9ebb31af6..43a021556258 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaMetastoreTable.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaMetastoreTable.java @@ -15,21 +15,17 @@ import io.trino.spi.connector.SchemaTableName; -import java.util.Optional; - import static java.util.Objects.requireNonNull; public record DeltaMetastoreTable( SchemaTableName schemaTableName, boolean managed, String location, - boolean catalogOwned, - Optional vendedCredentials) + boolean catalogOwned) { public DeltaMetastoreTable { requireNonNull(schemaTableName, "schemaTableName is null"); requireNonNull(location, "location is null"); - requireNonNull(vendedCredentials, "vendedCredentials is null"); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/VendedCredentials.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/FileSystemCredentials.java similarity index 51% rename from plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/VendedCredentials.java rename to plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/FileSystemCredentials.java index 18ed58913b55..0b5670c638af 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/VendedCredentials.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/FileSystemCredentials.java @@ -13,25 +13,17 @@ */ package io.trino.plugin.deltalake.metastore; -import com.google.common.collect.ImmutableMap; - -import java.time.Instant; import java.util.Map; -import java.util.Optional; - -import static java.util.Objects.requireNonNull; -public record VendedCredentials(Optional tableId, Instant expireAt, Map credentials) +public interface FileSystemCredentials { - public VendedCredentials - { - requireNonNull(tableId, "tableId is null"); - requireNonNull(expireAt, "expireAt is null"); - credentials = ImmutableMap.copyOf(credentials); - } + /** + * Returns extra credentials map to be passed to ConnectorIdentity + */ + Map asExtraCredentials(); - public static VendedCredentials empty() - { - return new VendedCredentials(Optional.empty(), Instant.MAX, ImmutableMap.of()); - } + /** + * Returns true if the credentials are still valid to be used. + */ + boolean isValid(); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java index 2be3484e2f62..79ebbab2dbc3 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java @@ -131,8 +131,7 @@ public static DeltaMetastoreTable convertToDeltaMetastoreTable(Table table) new SchemaTableName(table.getDatabaseName(), table.getTableName()), table.getTableType().equals(MANAGED_TABLE.name()), getTableLocation(table), - false, - Optional.empty()); + false); } public static String getTableLocation(Table table) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/VendedCredentialsHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/VendedCredentialsHandle.java index e6cd35477016..83ae9d033bf2 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/VendedCredentialsHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/VendedCredentialsHandle.java @@ -13,6 +13,8 @@ */ package io.trino.plugin.deltalake.metastore; +import java.util.Optional; + import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -20,7 +22,7 @@ public record VendedCredentialsHandle( boolean catalogOwned, boolean managed, String tableLocation, - VendedCredentials vendedCredentials) + Optional vendedCredentials) { public VendedCredentialsHandle { @@ -34,11 +36,11 @@ public record VendedCredentialsHandle( public static VendedCredentialsHandle empty(String tableLocation) { - return new VendedCredentialsHandle(false, false, tableLocation, VendedCredentials.empty()); + return new VendedCredentialsHandle(false, false, tableLocation, Optional.empty()); } public static VendedCredentialsHandle of(DeltaMetastoreTable table) { - return new VendedCredentialsHandle(table.catalogOwned(), table.managed(), table.location(), table.vendedCredentials().orElse(VendedCredentials.empty())); + return new VendedCredentialsHandle(table.catalogOwned(), table.managed(), table.location(), Optional.empty()); } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java index 34cc976ebf07..99236e23d545 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java @@ -516,8 +516,7 @@ private static DeltaLakeTableHandle createDeltaLakeTableHandle(Set createConstrainedColumnsTuple( diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeNodeLocalDynamicSplitPruning.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeNodeLocalDynamicSplitPruning.java index cf17bdf7507e..70342f40e114 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeNodeLocalDynamicSplitPruning.java @@ -27,6 +27,7 @@ import io.trino.parquet.writer.ParquetWriter; import io.trino.parquet.writer.ParquetWriterOptions; import io.trino.plugin.base.metrics.FileFormatDataSourceStats; +import io.trino.plugin.deltalake.metastore.NoOpVendedCredentialsProvider; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.hive.HiveTransactionHandle; @@ -149,8 +150,7 @@ public void testDynamicSplitPruningOnUnpartitionedTable() Optional.empty(), Optional.empty(), 0, - false, - Optional.empty()), + false), transaction); TupleDomain splitPruningPredicate = TupleDomain.withColumnDomains( @@ -250,8 +250,7 @@ public void testDynamicSplitPruningWithExplicitPartitionFilter() Optional.empty(), Optional.empty(), 0, - false, - Optional.empty()), + false), transaction); // Simulate situations where the dynamic filter (e.g.: while performing a JOIN with another table) reduces considerably @@ -328,7 +327,7 @@ private static ConnectorPageSource createTestingPageSource( { FileFormatDataSourceStats stats = new FileFormatDataSourceStats(); DeltaLakePageSourceProvider provider = new DeltaLakePageSourceProvider( - new DefaultDeltaLakeFileSystemFactory(new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS)), + new DefaultDeltaLakeFileSystemFactory(new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS), new NoOpVendedCredentialsProvider()), stats, PARQUET_READER_CONFIG, deltaLakeConfig, diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java index 734dfab52503..f699f51bc289 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePageSink.java @@ -23,7 +23,7 @@ import io.trino.operator.FlatHashStrategyCompiler; import io.trino.operator.GroupByHashPageIndexerFactory; import io.trino.plugin.base.metrics.FileFormatDataSourceStats; -import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle; +import io.trino.plugin.deltalake.metastore.NoOpVendedCredentialsProvider; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.hive.HiveTransactionHandle; import io.trino.plugin.hive.NodeVersion; @@ -181,12 +181,11 @@ private static ConnectorPageSink createPageSink(String outputPath, DeltaLakeWrit OptionalInt.empty(), false, OptionalLong.empty(), - new ProtocolEntry(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION, Optional.empty(), Optional.empty()), - VendedCredentialsHandle.empty(outputPath)); + new ProtocolEntry(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION, Optional.empty(), Optional.empty())); DeltaLakePageSinkProvider provider = new DeltaLakePageSinkProvider( new GroupByHashPageIndexerFactory(new FlatHashStrategyCompiler(new TypeOperators())), - new DefaultDeltaLakeFileSystemFactory(new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS)), + new DefaultDeltaLakeFileSystemFactory(new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS), new NoOpVendedCredentialsProvider()), JsonCodec.jsonCodec(DataFileInfo.class), JsonCodec.jsonCodec(DeltaLakeMergeResult.class), stats, diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java index d71e266a5668..8f138b960b5d 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java @@ -104,8 +104,7 @@ public class TestDeltaLakeSplitManager Optional.empty(), Optional.empty(), 0, - false, - Optional.empty()); + false); private final HiveTransactionHandle transactionHandle = new HiveTransactionHandle(true); @Test @@ -189,7 +188,7 @@ private DeltaLakeSplitManager setupSplitManager(List addFileEntrie TypeManager typeManager = context.getTypeManager(); HdfsFileSystemFactory hdfsFileSystemFactory = new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS); - DeltaLakeFileSystemFactory fileSystemFactory = new DefaultDeltaLakeFileSystemFactory(hdfsFileSystemFactory); + DeltaLakeFileSystemFactory fileSystemFactory = new DefaultDeltaLakeFileSystemFactory(hdfsFileSystemFactory, new NoOpVendedCredentialsProvider()); TransactionLogAccess transactionLogAccess = new TransactionLogAccess( typeManager, new CheckpointSchemaManager(typeManager), @@ -213,7 +212,7 @@ public Stream getActiveFiles( CheckpointWriterManager checkpointWriterManager = new CheckpointWriterManager( typeManager, new CheckpointSchemaManager(typeManager), - new DefaultDeltaLakeFileSystemFactory(hdfsFileSystemFactory), + new DefaultDeltaLakeFileSystemFactory(hdfsFileSystemFactory, new NoOpVendedCredentialsProvider()), new NodeVersion("test_version"), transactionLogAccess, new FileFormatDataSourceStats(), @@ -225,23 +224,22 @@ public Stream getActiveFiles( HiveMetastoreFactory hiveMetastoreFactory = HiveMetastoreFactory.ofInstance(createTestingFileHiveMetastore(new MemoryFileSystemFactory(), Location.of("memory:///"))); DeltaLakeMetadataFactory metadataFactory = new DeltaLakeMetadataFactory( hiveMetastoreFactory, - new DefaultDeltaLakeFileSystemFactory(hdfsFileSystemFactory), + new DefaultDeltaLakeFileSystemFactory(hdfsFileSystemFactory, new NoOpVendedCredentialsProvider()), transactionLogAccess, typeManager, new DeltaLakeConfig(), JsonCodec.jsonCodec(DataFileInfo.class), JsonCodec.jsonCodec(DeltaLakeMergeResult.class), - new FileSystemTransactionLogWriterFactory(new TransactionLogSynchronizerManager(ImmutableMap.of(), new NoIsolationSynchronizer(new DefaultDeltaLakeFileSystemFactory(hdfsFileSystemFactory)))), + new FileSystemTransactionLogWriterFactory(new TransactionLogSynchronizerManager(ImmutableMap.of(), new NoIsolationSynchronizer(new DefaultDeltaLakeFileSystemFactory(hdfsFileSystemFactory, new NoOpVendedCredentialsProvider())))), CURRENT_NODE, checkpointWriterManager, - new CachingExtendedStatisticsAccess(new MetaDirStatisticsAccess(new DefaultDeltaLakeFileSystemFactory(HDFS_FILE_SYSTEM_FACTORY), new JsonCodecFactory().jsonCodec(ExtendedStatistics.class))), + new CachingExtendedStatisticsAccess(new MetaDirStatisticsAccess(new DefaultDeltaLakeFileSystemFactory(HDFS_FILE_SYSTEM_FACTORY, new NoOpVendedCredentialsProvider()), new JsonCodecFactory().jsonCodec(ExtendedStatistics.class))), true, false, new NodeVersion("test_version"), new DeltaLakeTableMetadataScheduler(CURRENT_NODE, TESTING_TYPE_MANAGER, new DeltaLakeFileMetastoreTableOperationsProvider(hiveMetastoreFactory), Integer.MAX_VALUE, new DeltaLakeConfig()), newDirectExecutorService(), - transactionLogReaderFactory, - new NoOpVendedCredentialsProvider()); + transactionLogReaderFactory); ConnectorSession session = testingConnectorSessionWithConfig(deltaLakeConfig); DeltaLakeTransactionManager deltaLakeTransactionManager = new DeltaLakeTransactionManager(metadataFactory); @@ -252,7 +250,7 @@ public Stream getActiveFiles( transactionLogAccess, newDirectExecutorService(), deltaLakeConfig, - new DefaultDeltaLakeFileSystemFactory(HDFS_FILE_SYSTEM_FACTORY), + new DefaultDeltaLakeFileSystemFactory(HDFS_FILE_SYSTEM_FACTORY, new NoOpVendedCredentialsProvider()), deltaLakeTransactionManager, new DefaultCachingHostAddressProvider()); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java index 375ee2af1e4c..831cf2849a90 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java @@ -25,6 +25,7 @@ import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.filesystem.tracing.TracingFileSystemFactory; import io.trino.plugin.base.metrics.FileFormatDataSourceStats; +import io.trino.plugin.deltalake.metastore.NoOpVendedCredentialsProvider; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; @@ -108,7 +109,7 @@ public class TestTransactionLogAccess "age=29/part-00000-3794c463-cb0c-4beb-8d07-7cc1e3b5920f.c000.snappy.parquet"); private final TestingTelemetry testingTelemetry = TestingTelemetry.create("transaction-log-access"); - private final DefaultDeltaLakeFileSystemFactory tracingFileSystemFactory = new DefaultDeltaLakeFileSystemFactory(new TracingFileSystemFactory(testingTelemetry.getTracer(), new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS))); + private final DefaultDeltaLakeFileSystemFactory tracingFileSystemFactory = new DefaultDeltaLakeFileSystemFactory(new TracingFileSystemFactory(testingTelemetry.getTracer(), new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS)), new NoOpVendedCredentialsProvider()); private TransactionLogAccess transactionLogAccess; private TableSnapshot tableSnapshot; @@ -600,8 +601,7 @@ public void testIncrementalCacheUpdates() Optional.empty(), Optional.empty(), 0, - false, - Optional.empty()), + false), updatedTableSnapshot, TupleDomain.all(), alwaysTrue())) { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakePlugin.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakePlugin.java index b67daad9650f..616dbbebaa9e 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakePlugin.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakePlugin.java @@ -16,6 +16,7 @@ import com.google.inject.Module; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.local.LocalFileSystemFactory; +import io.trino.plugin.deltalake.metastore.NoOpVendedCredentialsProvider; import io.trino.plugin.deltalake.transactionlog.writer.LocalTransactionLogSynchronizer; import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogSynchronizer; import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig; @@ -76,7 +77,7 @@ public Connector create(String catalogName, Map config, Connecto newMapBinder(binder, String.class, TrinoFileSystemFactory.class) .addBinding("local").toInstance(localFileSystemFactory); newMapBinder(binder, String.class, TransactionLogSynchronizer.class) - .addBinding("local").toInstance(new LocalTransactionLogSynchronizer(new DefaultDeltaLakeFileSystemFactory(localFileSystemFactory))); + .addBinding("local").toInstance(new LocalTransactionLogSynchronizer(new DefaultDeltaLakeFileSystemFactory(localFileSystemFactory, new NoOpVendedCredentialsProvider()))); configBinder(binder).bindConfigDefaults(FileHiveMetastoreConfig.class, defaults -> defaults.setCatalogDirectory("local:///")); }); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakeUtils.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakeUtils.java index 4f9b7adaca29..670f3b8fd682 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakeUtils.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakeUtils.java @@ -70,8 +70,7 @@ public static DeltaLakeTableHandle createTable(SchemaTableName schemaTableName, Optional.empty(), Optional.empty(), 0, - false, - Optional.empty()); + false); } public static DeltaLakeTableHandle createTable(MetadataEntry metadataEntry, ProtocolEntry protocolEntry) @@ -91,8 +90,7 @@ public static DeltaLakeTableHandle createTable(MetadataEntry metadataEntry, Prot Optional.empty(), Optional.empty(), 0, - false, - Optional.empty()); + false); } public static List getTableActiveFiles(TransactionLogAccess transactionLogAccess, TrinoFileSystemFactory fileSystemFactory, String tableLocation) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/statistics/TestDeltaLakeFileBasedTableStatisticsProvider.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/statistics/TestDeltaLakeFileBasedTableStatisticsProvider.java index fe3018fb27e1..2b27d68c6268 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/statistics/TestDeltaLakeFileBasedTableStatisticsProvider.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/statistics/TestDeltaLakeFileBasedTableStatisticsProvider.java @@ -22,6 +22,7 @@ import io.trino.plugin.deltalake.DeltaLakeConfig; import io.trino.plugin.deltalake.DeltaLakeSessionProperties; import io.trino.plugin.deltalake.DeltaLakeTableHandle; +import io.trino.plugin.deltalake.metastore.NoOpVendedCredentialsProvider; import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; @@ -87,19 +88,19 @@ public TestDeltaLakeFileBasedTableStatisticsProvider() FileFormatDataSourceStats fileFormatDataSourceStats = new FileFormatDataSourceStats(); - transactionLogReaderFactory = new FileSystemTransactionLogReaderFactory(new DefaultDeltaLakeFileSystemFactory(HDFS_FILE_SYSTEM_FACTORY)); + transactionLogReaderFactory = new FileSystemTransactionLogReaderFactory(new DefaultDeltaLakeFileSystemFactory(HDFS_FILE_SYSTEM_FACTORY, new NoOpVendedCredentialsProvider())); transactionLogAccess = new TransactionLogAccess( typeManager, checkpointSchemaManager, new DeltaLakeConfig(), fileFormatDataSourceStats, - new DefaultDeltaLakeFileSystemFactory(HDFS_FILE_SYSTEM_FACTORY), + new DefaultDeltaLakeFileSystemFactory(HDFS_FILE_SYSTEM_FACTORY, new NoOpVendedCredentialsProvider()), new ParquetReaderConfig(), newDirectExecutorService(), transactionLogReaderFactory); - statistics = new CachingExtendedStatisticsAccess(new MetaDirStatisticsAccess(new DefaultDeltaLakeFileSystemFactory(HDFS_FILE_SYSTEM_FACTORY), new JsonCodecFactory().jsonCodec(ExtendedStatistics.class))); + statistics = new CachingExtendedStatisticsAccess(new MetaDirStatisticsAccess(new DefaultDeltaLakeFileSystemFactory(HDFS_FILE_SYSTEM_FACTORY, new NoOpVendedCredentialsProvider()), new JsonCodecFactory().jsonCodec(ExtendedStatistics.class))); tableStatisticsProvider = new FileBasedTableStatisticsProvider( typeManager, transactionLogAccess, @@ -138,8 +139,7 @@ private DeltaLakeTableHandle registerTable(String tableName, String directoryNam Optional.empty(), Optional.empty(), 0, - false, - Optional.empty()); + false); } @Test @@ -271,8 +271,7 @@ public void testStatisticsMultipleFiles() tableHandle.getUpdateRowIdColumns(), tableHandle.getAnalyzeHandle(), 0, - tableHandle.isTimeTravel(), - Optional.empty()); + tableHandle.isTimeTravel()); stats = getTableStatistics(SESSION, tableHandleWithUnenforcedConstraint); columnStatistics = stats.getColumnStatistics().get(COLUMN_HANDLE); assertThat(columnStatistics.getRange().get().getMin()).isEqualTo(0.0); @@ -298,8 +297,7 @@ public void testStatisticsNoRecords() tableHandle.getUpdateRowIdColumns(), tableHandle.getAnalyzeHandle(), 0, - tableHandle.isTimeTravel(), - Optional.empty()); + tableHandle.isTimeTravel()); DeltaLakeTableHandle tableHandleWithNoneUnenforcedConstraint = new DeltaLakeTableHandle( tableHandle.getSchemaName(), tableHandle.getTableName(), @@ -315,8 +313,7 @@ public void testStatisticsNoRecords() tableHandle.getUpdateRowIdColumns(), tableHandle.getAnalyzeHandle(), 0, - tableHandle.isTimeTravel(), - Optional.empty()); + tableHandle.isTimeTravel()); // If either the table handle's constraint or the provided Constraint are none, it will cause a 0 record count to be reported assertEmptyStats(getTableStatistics(SESSION, tableHandleWithNoneEnforcedConstraint)); assertEmptyStats(getTableStatistics(SESSION, tableHandleWithNoneUnenforcedConstraint)); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java index d95249ecf246..793f78b86ec6 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java @@ -25,6 +25,7 @@ import io.trino.plugin.base.metrics.FileFormatDataSourceStats; import io.trino.plugin.deltalake.DefaultDeltaLakeFileSystemFactory; import io.trino.plugin.deltalake.DeltaLakeConfig; +import io.trino.plugin.deltalake.metastore.NoOpVendedCredentialsProvider; import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager; import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint; @@ -86,7 +87,7 @@ public void setUp() checkpointSchemaManager = new CheckpointSchemaManager(TESTING_TYPE_MANAGER); tableLocation = getClass().getClassLoader().getResource("databricks73/person").toURI().toString(); - tracingFileSystemFactory = new DefaultDeltaLakeFileSystemFactory(new TracingFileSystemFactory(testingTelemetry.getTracer(), new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS))); + tracingFileSystemFactory = new DefaultDeltaLakeFileSystemFactory(new TracingFileSystemFactory(testingTelemetry.getTracer(), new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS)), new NoOpVendedCredentialsProvider()); credentialsHandle = VendedCredentialsHandle.empty(tableLocation); trackingFileSystem = tracingFileSystemFactory.create(SESSION, credentialsHandle); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestTransactionLogTail.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestTransactionLogTail.java index d952407132b9..cee2a1ed9a99 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestTransactionLogTail.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestTransactionLogTail.java @@ -15,6 +15,7 @@ import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.plugin.deltalake.DefaultDeltaLakeFileSystemFactory; +import io.trino.plugin.deltalake.metastore.NoOpVendedCredentialsProvider; import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.reader.FileSystemTransactionLogReader; @@ -52,7 +53,7 @@ private void testTail(String dataSource) private List updateJsonTransactionLogTails(String tableLocation) throws Exception { - DefaultDeltaLakeFileSystemFactory fileSystemFactory = new DefaultDeltaLakeFileSystemFactory(new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS)); + DefaultDeltaLakeFileSystemFactory fileSystemFactory = new DefaultDeltaLakeFileSystemFactory(new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS), new NoOpVendedCredentialsProvider()); VendedCredentialsHandle credentialsHandle = VendedCredentialsHandle.empty(tableLocation); TransactionLogReader transactionLogReader = new FileSystemTransactionLogReader(tableLocation, credentialsHandle, fileSystemFactory); TransactionLogTail transactionLogTail = transactionLogReader.loadNewTail(SESSION, Optional.of(10L), Optional.of(12L), DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE); @@ -64,7 +65,7 @@ private List updateJsonTransactionLogTails(String private List readJsonTransactionLogTails(String tableLocation) throws Exception { - DefaultDeltaLakeFileSystemFactory fileSystemFactory = new DefaultDeltaLakeFileSystemFactory(new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS)); + DefaultDeltaLakeFileSystemFactory fileSystemFactory = new DefaultDeltaLakeFileSystemFactory(new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS), new NoOpVendedCredentialsProvider()); VendedCredentialsHandle credentialsHandle = VendedCredentialsHandle.empty(tableLocation); TransactionLogReader transactionLogReader = new FileSystemTransactionLogReader(tableLocation, credentialsHandle, fileSystemFactory); TransactionLogTail transactionLogTail = transactionLogReader.loadNewTail(SESSION, Optional.of(10L), Optional.empty(), DEFAULT_TRANSACTION_LOG_MAX_CACHED_SIZE);