Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ Optional<TableExecuteHandle> getTableHandleForExecute(

void finishTableExecute(Session session, TableExecuteHandle handle, Collection<Slice> fragments, List<Object> tableExecuteState);

void executeTableExecute(Session session, TableExecuteHandle handle);
Map<String, Long> executeTableExecute(Session session, TableExecuteHandle handle);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Map<String, Long> instead of something more generic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like some Connector specific handle that has a method to get a Page and list of symbols ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is running directly inside SimpleTableExecuteOperator, so we don't need anything complex.
There isn't a need for anything more generic here at the moment, the use case of all the iceberg procedures where we want to add this is satisfied by this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we could expose additional metrics in double or other type ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but it could run for other cases as well. I can image that other table procedures will output some kind of an information

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've had one BIGINT field all these years and failed to even populate that properly.
So no, I don't think there are many use cases waiting for a more generic framework.
I've gone through all the iceberg procedures and everything worth showing in the output is an integer. See Outputs in https://iceberg.apache.org/docs/latest/spark-procedures/
There is nothing stopping anyone from using something more generic like io.trino.spi.metrics.Metric in the future. At the moment, I find it to be unnecessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could imagine something like OPTIMIZE could produce something like input file size histogram.
We could just use io.trino.spi.metrics.Metrics here.
However, I think Map<String, Long> is also OK for now


TableProperties getTableProperties(Session session, TableHandle handle);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,11 @@ public void finishTableExecute(Session session, TableExecuteHandle tableExecuteH
}

@Override
public void executeTableExecute(Session session, TableExecuteHandle tableExecuteHandle)
public Map<String, Long> executeTableExecute(Session session, TableExecuteHandle tableExecuteHandle)
{
CatalogHandle catalogHandle = tableExecuteHandle.catalogHandle();
ConnectorMetadata metadata = getMetadata(session, catalogHandle);
metadata.executeTableExecute(session.toConnectorSession(catalogHandle), tableExecuteHandle.connectorHandle());
return metadata.executeTableExecute(session.toConnectorSession(catalogHandle), tableExecuteHandle.connectorHandle());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,26 @@
*/
package io.trino.operator;

import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slices;
import io.trino.Session;
import io.trino.metadata.Metadata;
import io.trino.metadata.TableExecuteHandle;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.type.Type;
import io.trino.sql.planner.plan.PlanNodeId;

import java.util.List;
import java.util.Map;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

public class SimpleTableExecuteOperator
implements Operator
{
private static final Page PAGE = new Page(0);

public static class SimpleTableExecuteOperatorOperatorFactory
implements OperatorFactory
{
Expand All @@ -35,20 +41,23 @@ public static class SimpleTableExecuteOperatorOperatorFactory
private final Metadata metadata;
private final Session session;
private final TableExecuteHandle executeHandle;
private final List<Type> types;
private boolean closed;

public SimpleTableExecuteOperatorOperatorFactory(
int operatorId,
PlanNodeId planNodeId,
Metadata metadata,
Session session,
TableExecuteHandle executeHandle)
TableExecuteHandle executeHandle,
List<Type> types)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
this.metadata = requireNonNull(metadata, "planNodeId is null");
this.session = requireNonNull(session, "planNodeId is null");
this.executeHandle = requireNonNull(executeHandle, "executeHandle is null");
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
}

@Override
Expand All @@ -60,7 +69,8 @@ public Operator createOperator(DriverContext driverContext)
context,
metadata,
session,
executeHandle);
executeHandle,
types);
}

@Override
Expand All @@ -77,27 +87,31 @@ public OperatorFactory duplicate()
planNodeId,
metadata,
session,
executeHandle);
executeHandle,
types);
}
}

private final OperatorContext operatorContext;
private final Metadata metadata;
private final Session session;
private final TableExecuteHandle executeHandle;
private final List<Type> types;

private boolean finished;

public SimpleTableExecuteOperator(
OperatorContext operatorContext,
Metadata metadata,
Session session,
TableExecuteHandle executeHandle)
TableExecuteHandle executeHandle,
List<Type> types)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.session = requireNonNull(session, "session is null");
this.executeHandle = requireNonNull(executeHandle, "executeHandle is null");
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
}

@Override
Expand Down Expand Up @@ -125,9 +139,17 @@ public Page getOutput()
return null;
}

metadata.executeTableExecute(session, executeHandle);
Map<String, Long> metrics = metadata.executeTableExecute(session, executeHandle);
finished = true;
return PAGE;
PageBuilder pageBuilder = new PageBuilder(types);
BlockBuilder metricNameBuilder = pageBuilder.getBlockBuilder(0);
BlockBuilder metricValueBuilder = pageBuilder.getBlockBuilder(1);
for (Map.Entry<String, Long> entry : metrics.entrySet()) {
types.get(0).writeSlice(metricNameBuilder, Slices.utf8Slice(entry.getKey()));
types.get(1).writeLong(metricValueBuilder, entry.getValue());
pageBuilder.declarePosition();
}
return pageBuilder.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,9 @@ protected Scope visitTableExecute(TableExecute node, Optional<Scope> scope)
analysis.setUpdateType("ALTER TABLE EXECUTE");
analysis.setUpdateTarget(executeHandle.catalogHandle().getVersion(), tableName, Optional.of(table), Optional.empty());

if (!procedureMetadata.getExecutionMode().isReadsData()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this special case?

return createAndAssignScope(node, scope, Field.newUnqualified("metric_name", VARCHAR), Field.newUnqualified("metric_value", BIGINT));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will display metric_name metric_value even if there are not rows?

I think it would be more natural to have each metric as a separate titled column, but IMO current solution is OK for now

}
return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3476,7 +3476,8 @@ public PhysicalOperation visitSimpleTableExecuteNode(SimpleTableExecuteNode node
node.getId(),
metadata,
session,
node.getExecuteHandle());
node.getExecuteHandle(),
getSymbolTypes(node.getOutputSymbols()));

return new PhysicalOperation(operatorFactory, makeLayout(node));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,9 @@ private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute stat
if (!analysis.isTableExecuteReadsData()) {
SimpleTableExecuteNode node = new SimpleTableExecuteNode(
idAllocator.getNextId(),
symbolAllocator.newSymbol("rows", BIGINT),
ImmutableList.of(
symbolAllocator.newSymbol("metricName", VARCHAR),
symbolAllocator.newSymbol("metricValue", BIGINT)),
executeHandle);
return new RelationPlan(node, analysis.getRootScope(), node.getOutputSymbols(), Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,12 +687,11 @@ public PlanAndMappings visitSimpleTableExecuteNode(SimpleTableExecuteNode node,
{
Map<Symbol, Symbol> mapping = new HashMap<>(context.getCorrelationMapping());
SymbolMapper mapper = symbolMapper(mapping);
Symbol newOutput = mapper.map(node.getOutput());

return new PlanAndMappings(
new SimpleTableExecuteNode(
node.getId(),
newOutput,
mapper.map(node.getOutputSymbols()),
node.getExecuteHandle()),
mapping);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@
public class SimpleTableExecuteNode
extends PlanNode
{
private final Symbol output;
private final List<Symbol> outputs;
private final TableExecuteHandle executeHandle;

@JsonCreator
public SimpleTableExecuteNode(
@JsonProperty("id") PlanNodeId id,
@JsonProperty("output") Symbol output,
@JsonProperty("outputs") List<Symbol> outputs,
@JsonProperty("executeHandle") TableExecuteHandle executeHandle)
{
super(id);
this.output = requireNonNull(output, "output is null");
this.outputs = ImmutableList.copyOf(requireNonNull(outputs, "outputs is null"));
this.executeHandle = requireNonNull(executeHandle, "executeHandle is null");
}

Expand All @@ -48,16 +48,11 @@ public List<PlanNode> getSources()
return ImmutableList.of();
}

@JsonProperty("outputs")
@Override
public List<Symbol> getOutputSymbols()
{
return ImmutableList.of(output);
}

@JsonProperty
public Symbol getOutput()
{
return output;
return outputs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,11 @@ public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHa
}

@Override
public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
public Map<String, Long> executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
{
Span span = startSpan("executeTableExecute", tableExecuteHandle);
try (var _ = scopedSpan(span)) {
delegate.executeTableExecute(session, tableExecuteHandle);
return delegate.executeTableExecute(session, tableExecuteHandle);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,11 @@ public void finishTableExecute(Session session, TableExecuteHandle handle, Colle
}

@Override
public void executeTableExecute(Session session, TableExecuteHandle handle)
public Map<String, Long> executeTableExecute(Session session, TableExecuteHandle handle)
{
Span span = startSpan("executeTableExecute", handle);
try (var _ = scopedSpan(span)) {
delegate.executeTableExecute(session, handle);
return delegate.executeTableExecute(session, handle);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,10 @@ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(ConnectorS
}

@Override
public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) {}
public Map<String, Long> executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
{
return ImmutableMap.of();
}

@Override
public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection<Slice> fragments, List<Object> tableExecuteState) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void finishTableExecute(Session session, TableExecuteHandle handle, Colle
}

@Override
public void executeTableExecute(Session session, TableExecuteHandle handle)
public Map<String, Long> executeTableExecute(Session session, TableExecuteHandle handle)
{
throw new UnsupportedOperationException();
}
Expand Down
5 changes: 5 additions & 0 deletions core/trino-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,11 @@
<new>method void io.trino.spi.connector.Connector::shutdown()</new>
<justification>Require connector to implement shutdown to prevent leaks</justification>
</item>
<item>
<code>java.method.returnTypeChanged</code>
<old>method void io.trino.spi.connector.ConnectorMetadata::executeTableExecute(io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorTableExecuteHandle)</old>
<new>method java.util.Map&lt;java.lang.String, java.lang.Long&gt; io.trino.spi.connector.ConnectorMetadata::executeTableExecute(io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorTableExecuteHandle)</new>
</item>
</differences>
</revapi.differences>
</analysisConfiguration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,10 @@ default void finishTableExecute(ConnectorSession session, ConnectorTableExecuteH
}

/**
* Execute a {@link TableProcedureExecutionMode#coordinatorOnly() coordinator-only} table procedure.
* Execute a {@link TableProcedureExecutionMode#coordinatorOnly() coordinator-only} table procedure
* and return procedure execution metrics that will be populated in the query output.
*/
default void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
default Map<String, Long> executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we planning to limit ourselves to Long or - can be metrics be additional type like double ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or should we return them in Row type ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its limited to long for now, as that's what the current procedures need

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or should we return them in Row type ?

In the future, this should probably be Page where channels correspond to metric names.

{
throw new TrinoException(GENERIC_INTERNAL_ERROR, "ConnectorMetadata executeTableExecute() is not implemented");
}
Expand Down
27 changes: 27 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -907,12 +907,39 @@ time is recommended to keep size of a table's data directory under control.
ALTER TABLE test_table EXECUTE remove_orphan_files(retention_threshold => '7d');
```

```text
metric_name | metric_value
----------------------------+--------------
processed_manifests_count | 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed the output in Spark looks different from this: https://iceberg.apache.org/docs/latest/spark-procedures/#output_7.

Copy link
Member Author

@raunaqmorarka raunaqmorarka Sep 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the spark output is printing deleted file names. I don't want to do that since that is potentially a large list. The main use case here is that we want to provide a summary of what the procedure did. Right now you get no clue what/if the procedure did anything, unless you run metadata queries before and after and search for differences to the table.

active_files_count | 98
scanned_files_count | 97
deleted_files_count | 0
```

The value for `retention_threshold` must be higher than or equal to
`iceberg.remove-orphan-files.min-retention` in the catalog otherwise the
procedure fails with a similar message: `Retention specified (1.00d) is shorter
than the minimum retention configured in the system (7.00d)`. The default value
for this property is `7d`.

The output of the query has the following metrics:

:::{list-table} Output
:widths: 40, 60
:header-rows: 1

* - Property name
- Description
* - `processed_manifests_count`
- The count of manifest files read by remove_orphan_files.
* - `active_files_count`
- The count of files belonging to snapshots that have not been expired.
* - `scanned_files_count`
- The count of files scanned from the file system.
* - `deleted_files_count`
- The count of files deleted by remove_orphan_files.
:::

(drop-extended-stats)=
##### drop_extended_stats

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,10 @@ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(ConnectorS
}

@Override
public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
public Map<String, Long> executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
{
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) {
delegate.executeTableExecute(session, tableExecuteHandle);
return delegate.executeTableExecute(session, tableExecuteHandle);
}
}

Expand Down
Loading