diff --git a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java index e931cc74f00..173c5fa83a7 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java +++ b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java @@ -121,7 +121,7 @@ Optional getTableHandleForExecute( void finishTableExecute(Session session, TableExecuteHandle handle, Collection fragments, List tableExecuteState); - void executeTableExecute(Session session, TableExecuteHandle handle); + Map executeTableExecute(Session session, TableExecuteHandle handle); TableProperties getTableProperties(Session session, TableHandle handle); diff --git a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java index fa0a9067c14..2f2066fc031 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java @@ -362,11 +362,11 @@ public void finishTableExecute(Session session, TableExecuteHandle tableExecuteH } @Override - public void executeTableExecute(Session session, TableExecuteHandle tableExecuteHandle) + public Map executeTableExecute(Session session, TableExecuteHandle tableExecuteHandle) { CatalogHandle catalogHandle = tableExecuteHandle.catalogHandle(); ConnectorMetadata metadata = getMetadata(session, catalogHandle); - metadata.executeTableExecute(session.toConnectorSession(catalogHandle), tableExecuteHandle.connectorHandle()); + return metadata.executeTableExecute(session.toConnectorSession(catalogHandle), tableExecuteHandle.connectorHandle()); } @Override diff --git a/core/trino-main/src/main/java/io/trino/operator/SimpleTableExecuteOperator.java b/core/trino-main/src/main/java/io/trino/operator/SimpleTableExecuteOperator.java index e31db7611b2..0d54d991aa7 100644 --- a/core/trino-main/src/main/java/io/trino/operator/SimpleTableExecuteOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/SimpleTableExecuteOperator.java @@ -13,20 +13,26 @@ */ package io.trino.operator; +import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slices; import io.trino.Session; import io.trino.metadata.Metadata; import io.trino.metadata.TableExecuteHandle; import io.trino.spi.Page; +import io.trino.spi.PageBuilder; +import io.trino.spi.block.BlockBuilder; +import io.trino.spi.type.Type; import io.trino.sql.planner.plan.PlanNodeId; +import java.util.List; +import java.util.Map; + import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; public class SimpleTableExecuteOperator implements Operator { - private static final Page PAGE = new Page(0); - public static class SimpleTableExecuteOperatorOperatorFactory implements OperatorFactory { @@ -35,6 +41,7 @@ public static class SimpleTableExecuteOperatorOperatorFactory private final Metadata metadata; private final Session session; private final TableExecuteHandle executeHandle; + private final List types; private boolean closed; public SimpleTableExecuteOperatorOperatorFactory( @@ -42,13 +49,15 @@ public SimpleTableExecuteOperatorOperatorFactory( PlanNodeId planNodeId, Metadata metadata, Session session, - TableExecuteHandle executeHandle) + TableExecuteHandle executeHandle, + List types) { this.operatorId = operatorId; this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); this.metadata = requireNonNull(metadata, "planNodeId is null"); this.session = requireNonNull(session, "planNodeId is null"); this.executeHandle = requireNonNull(executeHandle, "executeHandle is null"); + this.types = ImmutableList.copyOf(requireNonNull(types, "types is null")); } @Override @@ -60,7 +69,8 @@ public Operator createOperator(DriverContext driverContext) context, metadata, session, - executeHandle); + executeHandle, + types); } @Override @@ -77,7 +87,8 @@ public OperatorFactory duplicate() planNodeId, metadata, session, - executeHandle); + executeHandle, + types); } } @@ -85,6 +96,7 @@ public OperatorFactory duplicate() private final Metadata metadata; private final Session session; private final TableExecuteHandle executeHandle; + private final List types; private boolean finished; @@ -92,12 +104,14 @@ public SimpleTableExecuteOperator( OperatorContext operatorContext, Metadata metadata, Session session, - TableExecuteHandle executeHandle) + TableExecuteHandle executeHandle, + List types) { this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); this.metadata = requireNonNull(metadata, "metadata is null"); this.session = requireNonNull(session, "session is null"); this.executeHandle = requireNonNull(executeHandle, "executeHandle is null"); + this.types = ImmutableList.copyOf(requireNonNull(types, "types is null")); } @Override @@ -125,9 +139,17 @@ public Page getOutput() return null; } - metadata.executeTableExecute(session, executeHandle); + Map metrics = metadata.executeTableExecute(session, executeHandle); finished = true; - return PAGE; + PageBuilder pageBuilder = new PageBuilder(types); + BlockBuilder metricNameBuilder = pageBuilder.getBlockBuilder(0); + BlockBuilder metricValueBuilder = pageBuilder.getBlockBuilder(1); + for (Map.Entry entry : metrics.entrySet()) { + types.get(0).writeSlice(metricNameBuilder, Slices.utf8Slice(entry.getKey())); + types.get(1).writeLong(metricValueBuilder, entry.getValue()); + pageBuilder.declarePosition(); + } + return pageBuilder.build(); } @Override diff --git a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java index e99c495475e..c0ce5e50bdd 100644 --- a/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java +++ b/core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java @@ -1323,6 +1323,9 @@ protected Scope visitTableExecute(TableExecute node, Optional scope) analysis.setUpdateType("ALTER TABLE EXECUTE"); analysis.setUpdateTarget(executeHandle.catalogHandle().getVersion(), tableName, Optional.of(table), Optional.empty()); + if (!procedureMetadata.getExecutionMode().isReadsData()) { + return createAndAssignScope(node, scope, Field.newUnqualified("metric_name", VARCHAR), Field.newUnqualified("metric_value", BIGINT)); + } return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT)); } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index e58ff61507c..2ffeedc6193 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -3476,7 +3476,8 @@ public PhysicalOperation visitSimpleTableExecuteNode(SimpleTableExecuteNode node node.getId(), metadata, session, - node.getExecuteHandle()); + node.getExecuteHandle(), + getSymbolTypes(node.getOutputSymbols())); return new PhysicalOperation(operatorFactory, makeLayout(node)); } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java index 99a0718ef58..8f85a006e15 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java @@ -966,7 +966,9 @@ private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute stat if (!analysis.isTableExecuteReadsData()) { SimpleTableExecuteNode node = new SimpleTableExecuteNode( idAllocator.getNextId(), - symbolAllocator.newSymbol("rows", BIGINT), + ImmutableList.of( + symbolAllocator.newSymbol("metricName", VARCHAR), + symbolAllocator.newSymbol("metricValue", BIGINT)), executeHandle); return new RelationPlan(node, analysis.getRootScope(), node.getOutputSymbols(), Optional.empty()); } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/UnaliasSymbolReferences.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/UnaliasSymbolReferences.java index 00160c2ae78..67330bb74b1 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/UnaliasSymbolReferences.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/UnaliasSymbolReferences.java @@ -687,12 +687,11 @@ public PlanAndMappings visitSimpleTableExecuteNode(SimpleTableExecuteNode node, { Map mapping = new HashMap<>(context.getCorrelationMapping()); SymbolMapper mapper = symbolMapper(mapping); - Symbol newOutput = mapper.map(node.getOutput()); return new PlanAndMappings( new SimpleTableExecuteNode( node.getId(), - newOutput, + mapper.map(node.getOutputSymbols()), node.getExecuteHandle()), mapping); } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/SimpleTableExecuteNode.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/SimpleTableExecuteNode.java index cba07cbd586..80b3f643ff2 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/plan/SimpleTableExecuteNode.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/SimpleTableExecuteNode.java @@ -27,17 +27,17 @@ public class SimpleTableExecuteNode extends PlanNode { - private final Symbol output; + private final List outputs; private final TableExecuteHandle executeHandle; @JsonCreator public SimpleTableExecuteNode( @JsonProperty("id") PlanNodeId id, - @JsonProperty("output") Symbol output, + @JsonProperty("outputs") List outputs, @JsonProperty("executeHandle") TableExecuteHandle executeHandle) { super(id); - this.output = requireNonNull(output, "output is null"); + this.outputs = ImmutableList.copyOf(requireNonNull(outputs, "outputs is null")); this.executeHandle = requireNonNull(executeHandle, "executeHandle is null"); } @@ -48,16 +48,11 @@ public List getSources() return ImmutableList.of(); } + @JsonProperty("outputs") @Override public List getOutputSymbols() { - return ImmutableList.of(output); - } - - @JsonProperty - public Symbol getOutput() - { - return output; + return outputs; } @Override diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java index e4a7523bf58..74b515d3d69 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java @@ -181,11 +181,11 @@ public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHa } @Override - public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) + public Map executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { Span span = startSpan("executeTableExecute", tableExecuteHandle); try (var _ = scopedSpan(span)) { - delegate.executeTableExecute(session, tableExecuteHandle); + return delegate.executeTableExecute(session, tableExecuteHandle); } } diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java index 42e15adf8ff..2b2d0b89b69 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java @@ -236,11 +236,11 @@ public void finishTableExecute(Session session, TableExecuteHandle handle, Colle } @Override - public void executeTableExecute(Session session, TableExecuteHandle handle) + public Map executeTableExecute(Session session, TableExecuteHandle handle) { Span span = startSpan("executeTableExecute", handle); try (var _ = scopedSpan(span)) { - delegate.executeTableExecute(session, handle); + return delegate.executeTableExecute(session, handle); } } diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java index 2aa53768333..7ca221e0aba 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java @@ -882,7 +882,10 @@ public Optional getTableHandleForExecute(ConnectorS } @Override - public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) {} + public Map executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) + { + return ImmutableMap.of(); + } @Override public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection fragments, List tableExecuteState) {} diff --git a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java index 9753f3e5fbb..f94a58b1362 100644 --- a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java +++ b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java @@ -170,7 +170,7 @@ public void finishTableExecute(Session session, TableExecuteHandle handle, Colle } @Override - public void executeTableExecute(Session session, TableExecuteHandle handle) + public Map executeTableExecute(Session session, TableExecuteHandle handle) { throw new UnsupportedOperationException(); } diff --git a/core/trino-spi/pom.xml b/core/trino-spi/pom.xml index 45f6e7fa269..61ce958a11c 100644 --- a/core/trino-spi/pom.xml +++ b/core/trino-spi/pom.xml @@ -832,6 +832,11 @@ method void io.trino.spi.connector.Connector::shutdown() Require connector to implement shutdown to prevent leaks + + java.method.returnTypeChanged + method void io.trino.spi.connector.ConnectorMetadata::executeTableExecute(io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorTableExecuteHandle) + method java.util.Map<java.lang.String, java.lang.Long> io.trino.spi.connector.ConnectorMetadata::executeTableExecute(io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorTableExecuteHandle) + diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java index 8f32252f898..132c39adf5c 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java @@ -160,9 +160,10 @@ default void finishTableExecute(ConnectorSession session, ConnectorTableExecuteH } /** - * Execute a {@link TableProcedureExecutionMode#coordinatorOnly() coordinator-only} table procedure. + * Execute a {@link TableProcedureExecutionMode#coordinatorOnly() coordinator-only} table procedure + * and return procedure execution metrics that will be populated in the query output. */ - default void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) + default Map executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { throw new TrinoException(GENERIC_INTERNAL_ERROR, "ConnectorMetadata executeTableExecute() is not implemented"); } diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index a95dd2cfa27..9f759a03244 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -907,12 +907,39 @@ time is recommended to keep size of a table's data directory under control. ALTER TABLE test_table EXECUTE remove_orphan_files(retention_threshold => '7d'); ``` +```text + metric_name | metric_value +----------------------------+-------------- + processed_manifests_count | 2 + active_files_count | 98 + scanned_files_count | 97 + deleted_files_count | 0 +``` + The value for `retention_threshold` must be higher than or equal to `iceberg.remove-orphan-files.min-retention` in the catalog otherwise the procedure fails with a similar message: `Retention specified (1.00d) is shorter than the minimum retention configured in the system (7.00d)`. The default value for this property is `7d`. +The output of the query has the following metrics: + +:::{list-table} Output +:widths: 40, 60 +:header-rows: 1 + +* - Property name + - Description +* - `processed_manifests_count` + - The count of manifest files read by remove_orphan_files. +* - `active_files_count` + - The count of files belonging to snapshots that have not been expired. +* - `scanned_files_count` + - The count of files scanned from the file system. +* - `deleted_files_count` + - The count of files deleted by remove_orphan_files. +::: + (drop-extended-stats)= ##### drop_extended_stats diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java index 339a4b8738a..a4ea5ec77d7 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java @@ -211,10 +211,10 @@ public Optional getTableHandleForExecute(ConnectorS } @Override - public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) + public Map executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) { - delegate.executeTableExecute(session, tableExecuteHandle); + return delegate.executeTableExecute(session, tableExecuteHandle); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 65d45234461..1bb4fd914e5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -1972,31 +1972,30 @@ private static void commitTransaction(Transaction transaction, String operation) } @Override - public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) + public Map executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { IcebergTableExecuteHandle executeHandle = (IcebergTableExecuteHandle) tableExecuteHandle; switch (executeHandle.procedureId()) { case OPTIMIZE_MANIFESTS: executeOptimizeManifests(session, executeHandle); - return; + return ImmutableMap.of(); case DROP_EXTENDED_STATS: executeDropExtendedStats(session, executeHandle); - return; + return ImmutableMap.of(); case ROLLBACK_TO_SNAPSHOT: executeRollbackToSnapshot(session, executeHandle); - return; + return ImmutableMap.of(); case EXPIRE_SNAPSHOTS: executeExpireSnapshots(session, executeHandle); - return; + return ImmutableMap.of(); case REMOVE_ORPHAN_FILES: - executeRemoveOrphanFiles(session, executeHandle); - return; + return executeRemoveOrphanFiles(session, executeHandle); case ADD_FILES: executeAddFiles(session, executeHandle); - return; + return ImmutableMap.of(); case ADD_FILES_FROM_TABLE: executeAddFilesFromTable(session, executeHandle); - return; + return ImmutableMap.of(); default: throw new IllegalArgumentException("Unknown procedure '" + executeHandle.procedureId() + "'"); } @@ -2118,7 +2117,7 @@ private static void validateTableExecuteParameters( sessionMinRetentionParameterName); } - public void executeRemoveOrphanFiles(ConnectorSession session, IcebergTableExecuteHandle executeHandle) + public Map executeRemoveOrphanFiles(ConnectorSession session, IcebergTableExecuteHandle executeHandle) { IcebergRemoveOrphanFilesHandle removeOrphanFilesHandle = (IcebergRemoveOrphanFilesHandle) executeHandle.procedureHandle(); @@ -2135,14 +2134,14 @@ public void executeRemoveOrphanFiles(ConnectorSession session, IcebergTableExecu if (table.currentSnapshot() == null) { log.debug("Skipping remove_orphan_files procedure for empty table %s", table); - return; + return ImmutableMap.of(); } Instant expiration = session.getStart().minusMillis(retention.toMillis()); - removeOrphanFiles(table, session, executeHandle.schemaTableName(), expiration, executeHandle.fileIoProperties()); + return removeOrphanFiles(table, session, executeHandle.schemaTableName(), expiration, executeHandle.fileIoProperties()); } - private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, Instant expiration, Map fileIoProperties) + private Map removeOrphanFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, Instant expiration, Map fileIoProperties) { Set processedManifestFilePaths = new HashSet<>(); // Similarly to issues like https://github.com/trinodb/trino/issues/13759, equivalent paths may have different String @@ -2205,7 +2204,18 @@ private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTabl // Ensure any futures still running are canceled in case of failure manifestScanFutures.forEach(future -> future.cancel(true)); } - scanAndDeleteInvalidFiles(table, session, schemaTableName, expiration, validFileNames, fileIoProperties); + ScanAndDeleteResult result = scanAndDeleteInvalidFiles(table, session, schemaTableName, expiration, validFileNames, fileIoProperties); + log.info("remove_orphan_files for table %s processed %d manifest files, found %d active files, scanned %d files, deleted %d files", + schemaTableName, + processedManifestFilePaths.size(), + validFileNames.size() - 1, // excluding version-hint.text + result.scannedFilesCount(), + result.deletedFilesCount()); + return ImmutableMap.of( + "processed_manifests_count", (long) processedManifestFilePaths.size(), + "active_files_count", (long) validFileNames.size() - 1, // excluding version-hint.text + "scanned_files_count", result.scannedFilesCount(), + "deleted_files_count", result.deletedFilesCount()); } public void executeAddFiles(ConnectorSession session, IcebergTableExecuteHandle executeHandle) @@ -2240,17 +2250,21 @@ public void executeAddFilesFromTable(ConnectorSession session, IcebergTableExecu icebergScanExecutor); } - private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, Instant expiration, Set validFiles, Map fileIoProperties) + private ScanAndDeleteResult scanAndDeleteInvalidFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, Instant expiration, Set validFiles, Map fileIoProperties) { List> deleteFutures = new ArrayList<>(); + long scannedFilesCount = 0; + long deletedFilesCount = 0; try { List filesToDelete = new ArrayList<>(DELETE_BATCH_SIZE); TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), fileIoProperties); FileIterator allFiles = fileSystem.listFiles(Location.of(table.location())); while (allFiles.hasNext()) { FileEntry entry = allFiles.next(); + scannedFilesCount++; if (entry.lastModified().isBefore(expiration) && !validFiles.contains(entry.location().fileName())) { filesToDelete.add(entry.location()); + deletedFilesCount++; if (filesToDelete.size() >= DELETE_BATCH_SIZE) { List finalFilesToDelete = filesToDelete; deleteFutures.add(icebergFileDeleteExecutor.submit(() -> deleteFiles(finalFilesToDelete, schemaTableName, fileSystem))); @@ -2277,8 +2291,11 @@ private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, Sc // Ensure any futures still running are canceled in case of failure deleteFutures.forEach(future -> future.cancel(true)); } + return new ScanAndDeleteResult(scannedFilesCount, deletedFilesCount); } + private record ScanAndDeleteResult(long scannedFilesCount, long deletedFilesCount) {} + private void deleteFiles(List files, SchemaTableName schemaTableName, TrinoFileSystem fileSystem) { log.debug("Deleting files while removing orphan files for table %s [%s]", schemaTableName, files); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 757c87682b5..445756420cc 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -6645,7 +6645,10 @@ public void testRemoveOrphanFiles() List initialDataFiles = getAllDataFilesFromTableDirectory(tableName); assertThat(initialDataFiles).contains(orphanFile); - assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE REMOVE_ORPHAN_FILES (retention_threshold => '0s')"); + assertUpdate( + sessionWithShortRetentionUnlocked, + "ALTER TABLE " + tableName + " EXECUTE REMOVE_ORPHAN_FILES (retention_threshold => '0s')", + "VALUES ('processed_manifests_count', 3), ('active_files_count', 16), ('scanned_files_count', 17), ('deleted_files_count', 1)"); assertQuery("SELECT * FROM " + tableName, "VALUES ('one', 1), ('three', 3)"); List updatedDataFiles = getAllDataFilesFromTableDirectory(tableName); diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java index fd581d527d5..8aade670180 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java @@ -198,9 +198,9 @@ public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHa } @Override - public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) + public Map executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { - forHandle(tableExecuteHandle).executeTableExecute(session, tableExecuteHandle); + return forHandle(tableExecuteHandle).executeTableExecute(session, tableExecuteHandle); } @Override diff --git a/testing/trino-testing/src/main/java/io/trino/testing/QueryAssertions.java b/testing/trino-testing/src/main/java/io/trino/testing/QueryAssertions.java index 81f620546ef..7707bbf0605 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/QueryAssertions.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/QueryAssertions.java @@ -336,7 +336,7 @@ private static void assertDistributedQuery( List actualRows = actualResults.getMaterializedRows(); List expectedRows = expectedResults.getMaterializedRows(); - if (compareUpdate) { + if (compareUpdate && !actualResults.getUpdateType().equals(Optional.of("ALTER TABLE EXECUTE"))) { if (actualResults.getUpdateType().isEmpty()) { fail("update type not present for query " + queryId + ": \n" + actual); }