Skip to content

Commit 1ee97a9

Browse files
committed
Allow coordinator only procedures to output metrics in result
1 parent 75adfcd commit 1ee97a9

File tree

17 files changed

+76
-45
lines changed

17 files changed

+76
-45
lines changed

core/trino-main/src/main/java/io/trino/metadata/Metadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ Optional<TableExecuteHandle> getTableHandleForExecute(
121121

122122
void finishTableExecute(Session session, TableExecuteHandle handle, Collection<Slice> fragments, List<Object> tableExecuteState);
123123

124-
void executeTableExecute(Session session, TableExecuteHandle handle);
124+
Map<String, Long> executeTableExecute(Session session, TableExecuteHandle handle);
125125

126126
TableProperties getTableProperties(Session session, TableHandle handle);
127127

core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -362,11 +362,11 @@ public void finishTableExecute(Session session, TableExecuteHandle tableExecuteH
362362
}
363363

364364
@Override
365-
public void executeTableExecute(Session session, TableExecuteHandle tableExecuteHandle)
365+
public Map<String, Long> executeTableExecute(Session session, TableExecuteHandle tableExecuteHandle)
366366
{
367367
CatalogHandle catalogHandle = tableExecuteHandle.catalogHandle();
368368
ConnectorMetadata metadata = getMetadata(session, catalogHandle);
369-
metadata.executeTableExecute(session.toConnectorSession(catalogHandle), tableExecuteHandle.connectorHandle());
369+
return metadata.executeTableExecute(session.toConnectorSession(catalogHandle), tableExecuteHandle.connectorHandle());
370370
}
371371

372372
@Override

core/trino-main/src/main/java/io/trino/operator/SimpleTableExecuteOperator.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,26 @@
1313
*/
1414
package io.trino.operator;
1515

16+
import com.google.common.collect.ImmutableList;
17+
import io.airlift.slice.Slices;
1618
import io.trino.Session;
1719
import io.trino.metadata.Metadata;
1820
import io.trino.metadata.TableExecuteHandle;
1921
import io.trino.spi.Page;
22+
import io.trino.spi.PageBuilder;
23+
import io.trino.spi.block.BlockBuilder;
24+
import io.trino.spi.type.Type;
2025
import io.trino.sql.planner.plan.PlanNodeId;
2126

27+
import java.util.List;
28+
import java.util.Map;
29+
2230
import static com.google.common.base.Preconditions.checkState;
2331
import static java.util.Objects.requireNonNull;
2432

2533
public class SimpleTableExecuteOperator
2634
implements Operator
2735
{
28-
private static final Page PAGE = new Page(0);
29-
3036
public static class SimpleTableExecuteOperatorOperatorFactory
3137
implements OperatorFactory
3238
{
@@ -35,20 +41,23 @@ public static class SimpleTableExecuteOperatorOperatorFactory
3541
private final Metadata metadata;
3642
private final Session session;
3743
private final TableExecuteHandle executeHandle;
44+
private final List<Type> types;
3845
private boolean closed;
3946

4047
public SimpleTableExecuteOperatorOperatorFactory(
4148
int operatorId,
4249
PlanNodeId planNodeId,
4350
Metadata metadata,
4451
Session session,
45-
TableExecuteHandle executeHandle)
52+
TableExecuteHandle executeHandle,
53+
List<Type> types)
4654
{
4755
this.operatorId = operatorId;
4856
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
4957
this.metadata = requireNonNull(metadata, "planNodeId is null");
5058
this.session = requireNonNull(session, "planNodeId is null");
5159
this.executeHandle = requireNonNull(executeHandle, "executeHandle is null");
60+
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
5261
}
5362

5463
@Override
@@ -60,7 +69,8 @@ public Operator createOperator(DriverContext driverContext)
6069
context,
6170
metadata,
6271
session,
63-
executeHandle);
72+
executeHandle,
73+
types);
6474
}
6575

6676
@Override
@@ -77,27 +87,31 @@ public OperatorFactory duplicate()
7787
planNodeId,
7888
metadata,
7989
session,
80-
executeHandle);
90+
executeHandle,
91+
types);
8192
}
8293
}
8394

8495
private final OperatorContext operatorContext;
8596
private final Metadata metadata;
8697
private final Session session;
8798
private final TableExecuteHandle executeHandle;
99+
private final List<Type> types;
88100

89101
private boolean finished;
90102

91103
public SimpleTableExecuteOperator(
92104
OperatorContext operatorContext,
93105
Metadata metadata,
94106
Session session,
95-
TableExecuteHandle executeHandle)
107+
TableExecuteHandle executeHandle,
108+
List<Type> types)
96109
{
97110
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
98111
this.metadata = requireNonNull(metadata, "metadata is null");
99112
this.session = requireNonNull(session, "session is null");
100113
this.executeHandle = requireNonNull(executeHandle, "executeHandle is null");
114+
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
101115
}
102116

103117
@Override
@@ -125,9 +139,17 @@ public Page getOutput()
125139
return null;
126140
}
127141

128-
metadata.executeTableExecute(session, executeHandle);
142+
Map<String, Long> metrics = metadata.executeTableExecute(session, executeHandle);
129143
finished = true;
130-
return PAGE;
144+
PageBuilder pageBuilder = new PageBuilder(types);
145+
BlockBuilder metricNameBuilder = pageBuilder.getBlockBuilder(0);
146+
BlockBuilder metricValueBuilder = pageBuilder.getBlockBuilder(1);
147+
for (Map.Entry<String, Long> entry : metrics.entrySet()) {
148+
types.get(0).writeSlice(metricNameBuilder, Slices.utf8Slice(entry.getKey()));
149+
types.get(1).writeLong(metricValueBuilder, entry.getValue());
150+
pageBuilder.declarePosition();
151+
}
152+
return pageBuilder.build();
131153
}
132154

133155
@Override

core/trino-main/src/main/java/io/trino/sql/analyzer/StatementAnalyzer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1323,6 +1323,9 @@ protected Scope visitTableExecute(TableExecute node, Optional<Scope> scope)
13231323
analysis.setUpdateType("ALTER TABLE EXECUTE");
13241324
analysis.setUpdateTarget(executeHandle.catalogHandle().getVersion(), tableName, Optional.of(table), Optional.empty());
13251325

1326+
if (!procedureMetadata.getExecutionMode().isReadsData()) {
1327+
return createAndAssignScope(node, scope, Field.newUnqualified("metric_name", VARCHAR), Field.newUnqualified("metric_value", BIGINT));
1328+
}
13261329
return createAndAssignScope(node, scope, Field.newUnqualified("rows", BIGINT));
13271330
}
13281331

core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3476,7 +3476,8 @@ public PhysicalOperation visitSimpleTableExecuteNode(SimpleTableExecuteNode node
34763476
node.getId(),
34773477
metadata,
34783478
session,
3479-
node.getExecuteHandle());
3479+
node.getExecuteHandle(),
3480+
getSymbolTypes(node.getOutputSymbols()));
34803481

34813482
return new PhysicalOperation(operatorFactory, makeLayout(node));
34823483
}

core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -966,7 +966,9 @@ private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute stat
966966
if (!analysis.isTableExecuteReadsData()) {
967967
SimpleTableExecuteNode node = new SimpleTableExecuteNode(
968968
idAllocator.getNextId(),
969-
symbolAllocator.newSymbol("rows", BIGINT),
969+
ImmutableList.of(
970+
symbolAllocator.newSymbol("metricName", VARCHAR),
971+
symbolAllocator.newSymbol("metricValue", BIGINT)),
970972
executeHandle);
971973
return new RelationPlan(node, analysis.getRootScope(), node.getOutputSymbols(), Optional.empty());
972974
}

core/trino-main/src/main/java/io/trino/sql/planner/optimizations/UnaliasSymbolReferences.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -687,12 +687,11 @@ public PlanAndMappings visitSimpleTableExecuteNode(SimpleTableExecuteNode node,
687687
{
688688
Map<Symbol, Symbol> mapping = new HashMap<>(context.getCorrelationMapping());
689689
SymbolMapper mapper = symbolMapper(mapping);
690-
Symbol newOutput = mapper.map(node.getOutput());
691690

692691
return new PlanAndMappings(
693692
new SimpleTableExecuteNode(
694693
node.getId(),
695-
newOutput,
694+
mapper.map(node.getOutputSymbols()),
696695
node.getExecuteHandle()),
697696
mapping);
698697
}

core/trino-main/src/main/java/io/trino/sql/planner/plan/SimpleTableExecuteNode.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,17 @@
2727
public class SimpleTableExecuteNode
2828
extends PlanNode
2929
{
30-
private final Symbol output;
30+
private final List<Symbol> outputs;
3131
private final TableExecuteHandle executeHandle;
3232

3333
@JsonCreator
3434
public SimpleTableExecuteNode(
3535
@JsonProperty("id") PlanNodeId id,
36-
@JsonProperty("output") Symbol output,
36+
@JsonProperty("outputs") List<Symbol> outputs,
3737
@JsonProperty("executeHandle") TableExecuteHandle executeHandle)
3838
{
3939
super(id);
40-
this.output = requireNonNull(output, "output is null");
40+
this.outputs = ImmutableList.copyOf(requireNonNull(outputs, "outputs is null"));
4141
this.executeHandle = requireNonNull(executeHandle, "executeHandle is null");
4242
}
4343

@@ -48,16 +48,11 @@ public List<PlanNode> getSources()
4848
return ImmutableList.of();
4949
}
5050

51+
@JsonProperty("outputs")
5152
@Override
5253
public List<Symbol> getOutputSymbols()
5354
{
54-
return ImmutableList.of(output);
55-
}
56-
57-
@JsonProperty
58-
public Symbol getOutput()
59-
{
60-
return output;
55+
return outputs;
6156
}
6257

6358
@Override

core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,11 +181,11 @@ public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHa
181181
}
182182

183183
@Override
184-
public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
184+
public Map<String, Long> executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
185185
{
186186
Span span = startSpan("executeTableExecute", tableExecuteHandle);
187187
try (var _ = scopedSpan(span)) {
188-
delegate.executeTableExecute(session, tableExecuteHandle);
188+
return delegate.executeTableExecute(session, tableExecuteHandle);
189189
}
190190
}
191191

core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,11 +236,11 @@ public void finishTableExecute(Session session, TableExecuteHandle handle, Colle
236236
}
237237

238238
@Override
239-
public void executeTableExecute(Session session, TableExecuteHandle handle)
239+
public Map<String, Long> executeTableExecute(Session session, TableExecuteHandle handle)
240240
{
241241
Span span = startSpan("executeTableExecute", handle);
242242
try (var _ = scopedSpan(span)) {
243-
delegate.executeTableExecute(session, handle);
243+
return delegate.executeTableExecute(session, handle);
244244
}
245245
}
246246

0 commit comments

Comments
 (0)