Skip to content

Commit 5c717d6

Browse files
mkarrmannfacebook-github-bot
authored andcommitted
feat: Make table info JSON length configurable (prestodb#26731)
Summary: ## Problem Currently, `TableFinishInfo` hard-codes a 10 MB limit for serialized metadata info. If that size limit is exceeded, data is dropped. This cannot be configured, and is problematic for users who rely upon `TableFinishInfo` containing complete information ## Solution - Make the length limit configurable via a Session Property. - Make the default value of the session property configurable at the cluster-level. - When the length limit is set to 0, entirely bypass enforcing a size limit. (This is slightly awkward since users still need to set arbitrary units for the `DataSize`, but this is acceptable IMO) Differential Revision: D87825974
1 parent c526fa0 commit 5c717d6

File tree

11 files changed

+143
-26
lines changed

11 files changed

+143
-26
lines changed

presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.facebook.presto.execution.warnings.WarningCollectorConfig;
2828
import com.facebook.presto.memory.MemoryManagerConfig;
2929
import com.facebook.presto.memory.NodeMemoryConfig;
30+
import com.facebook.presto.operator.TableFinishConfig;
3031
import com.facebook.presto.spi.PrestoException;
3132
import com.facebook.presto.spi.eventlistener.CTEInformation;
3233
import com.facebook.presto.spi.security.ViewSecurity;
@@ -133,6 +134,7 @@ public final class SystemSessionProperties
133134
public static final String SCALE_WRITERS = "scale_writers";
134135
public static final String WRITER_MIN_SIZE = "writer_min_size";
135136
public static final String OPTIMIZED_SCALE_WRITER_PRODUCER_BUFFER = "optimized_scale_writer_producer_buffer";
137+
public static final String TABLE_FINISH_INFO_JSON_LENGTH_LIMIT = "table_finish_info_json_length_limit";
136138
public static final String PUSH_TABLE_WRITE_THROUGH_UNION = "push_table_write_through_union";
137139
public static final String EXECUTION_POLICY = "execution_policy";
138140
public static final String DICTIONARY_AGGREGATION = "dictionary_aggregation";
@@ -382,7 +384,8 @@ public SystemSessionProperties()
382384
new NodeSpillConfig(),
383385
new TracingConfig(),
384386
new CompilerConfig(),
385-
new HistoryBasedOptimizationConfig());
387+
new HistoryBasedOptimizationConfig(),
388+
new TableFinishConfig());
386389
}
387390

388391
@Inject
@@ -398,7 +401,8 @@ public SystemSessionProperties(
398401
NodeSpillConfig nodeSpillConfig,
399402
TracingConfig tracingConfig,
400403
CompilerConfig compilerConfig,
401-
HistoryBasedOptimizationConfig historyBasedOptimizationConfig)
404+
HistoryBasedOptimizationConfig historyBasedOptimizationConfig,
405+
TableFinishConfig tableFinishConfig)
402406
{
403407
sessionProperties = ImmutableList.of(
404408
integerProperty(
@@ -560,6 +564,11 @@ public SystemSessionProperties(
560564
"Optimize scale writer creation based on producer buffer",
561565
featuresConfig.isOptimizedScaleWriterProducerBuffer(),
562566
true),
567+
integerProperty(
568+
TABLE_FINISH_INFO_JSON_LENGTH_LIMIT,
569+
"Maximum number of characters in connector output metadata JSON in table finish info",
570+
tableFinishConfig.getTableFinishInfoJsonLengthLimit(),
571+
true),
563572
booleanProperty(
564573
PUSH_TABLE_WRITE_THROUGH_UNION,
565574
"Parallelize writes when using UNION ALL in queries that write data",
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.operator;
15+
16+
import com.facebook.airlift.configuration.Config;
17+
import com.facebook.airlift.configuration.ConfigDescription;
18+
import jakarta.validation.constraints.NotNull;
19+
20+
public class TableFinishConfig
21+
{
22+
private int tableFinishInfoJsonLengthLimit = 10_000_000;
23+
24+
@NotNull
25+
public int getTableFinishInfoJsonLengthLimit()
26+
{
27+
return tableFinishInfoJsonLengthLimit;
28+
}
29+
30+
@Config("table-finish-info-json-length-limit")
31+
@ConfigDescription("Maximum number of characters in connector output metadata JSON in table finish info")
32+
public TableFinishConfig setTableFinishInfoJsonLengthLimit(int tableFinishInfoJsonLengthLimit)
33+
{
34+
this.tableFinishInfoJsonLengthLimit = tableFinishInfoJsonLengthLimit;
35+
return this;
36+
}
37+
}

presto-main-base/src/main/java/com/facebook/presto/operator/TableFinishInfo.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package com.facebook.presto.operator;
1515

1616
import com.facebook.airlift.json.JsonCodec;
17-
import com.facebook.airlift.units.DataSize;
1817
import com.facebook.airlift.units.Duration;
1918
import com.facebook.drift.annotations.ThriftConstructor;
2019
import com.facebook.drift.annotations.ThriftField;
@@ -26,15 +25,12 @@
2625
import java.util.Optional;
2726

2827
import static com.facebook.airlift.json.JsonCodec.jsonCodec;
29-
import static com.facebook.airlift.units.DataSize.Unit.MEGABYTE;
30-
import static java.lang.Math.toIntExact;
3128
import static java.util.Objects.requireNonNull;
3229

3330
@ThriftStruct
3431
public class TableFinishInfo
3532
implements OperatorInfo
3633
{
37-
private static final int JSON_LENGTH_LIMIT = toIntExact(new DataSize(10, MEGABYTE).toBytes());
3834
private static final JsonCodec<Object> INFO_CODEC = jsonCodec(Object.class);
3935

4036
private final String serializedConnectorOutputMetadata;
@@ -43,20 +39,20 @@ public class TableFinishInfo
4339
private final Duration statisticsCpuTime;
4440

4541
public TableFinishInfo(Optional<ConnectorOutputMetadata> metadata, Duration statisticsWallTime, Duration statisticsCpuTime)
42+
{
43+
this(metadata, statisticsWallTime, statisticsCpuTime, 0);
44+
}
45+
46+
public TableFinishInfo(Optional<ConnectorOutputMetadata> metadata, Duration statisticsWallTime, Duration statisticsCpuTime, int jsonLengthLimit)
4647
{
4748
String serializedConnectorOutputMetadata = null;
4849
boolean jsonLengthLimitExceeded = false;
4950
if (metadata.isPresent()) {
50-
Optional<String> serializedMetadata = INFO_CODEC.toJsonWithLengthLimit(metadata.get().getInfo(), JSON_LENGTH_LIMIT);
51-
if (!serializedMetadata.isPresent()) {
52-
serializedConnectorOutputMetadata = null;
53-
jsonLengthLimitExceeded = true;
54-
}
55-
else {
56-
serializedConnectorOutputMetadata = serializedMetadata.get();
57-
jsonLengthLimitExceeded = false;
58-
}
51+
Optional<String> serializedMetadata = getSerializedMetadataWithLimit(metadata.get(), jsonLengthLimit);
52+
serializedConnectorOutputMetadata = serializedMetadata.orElse(null);
53+
jsonLengthLimitExceeded = !serializedMetadata.isPresent();
5954
}
55+
6056
this.serializedConnectorOutputMetadata = serializedConnectorOutputMetadata;
6157
this.jsonLengthLimitExceeded = jsonLengthLimitExceeded;
6258
this.statisticsWallTime = requireNonNull(statisticsWallTime, "statisticsWallTime is null");
@@ -110,4 +106,13 @@ public boolean isFinal()
110106
{
111107
return true;
112108
}
109+
110+
private static Optional<String> getSerializedMetadataWithLimit(ConnectorOutputMetadata metadata, int jsonLengthLimit)
111+
{
112+
Object metadataInfo = metadata.getInfo();
113+
if (jsonLengthLimit <= 0) {
114+
return Optional.of(INFO_CODEC.toJson(metadataInfo));
115+
}
116+
return INFO_CODEC.toJsonWithLengthLimit(metadataInfo, jsonLengthLimit);
117+
}
113118
}

presto-main-base/src/main/java/com/facebook/presto/operator/TableFinishOperator.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.function.Supplier;
4848

4949
import static com.facebook.airlift.units.Duration.succinctNanos;
50+
import static com.facebook.presto.SystemSessionProperties.TABLE_FINISH_INFO_JSON_LENGTH_LIMIT;
5051
import static com.facebook.presto.SystemSessionProperties.isStatisticsCpuTimerEnabled;
5152
import static com.facebook.presto.common.type.BigintType.BIGINT;
5253
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
@@ -114,6 +115,7 @@ public Operator createOperator(DriverContext driverContext)
114115
OperatorContext context = driverContext.addOperatorContext(operatorId, planNodeId, TableFinishOperator.class.getSimpleName());
115116
Operator statisticsAggregationOperator = statisticsAggregationOperatorFactory.createOperator(driverContext);
116117
boolean statisticsCpuTimerEnabled = !(statisticsAggregationOperator instanceof DevNullOperator) && isStatisticsCpuTimerEnabled(session);
118+
int jsonLengthLimit = session.getSystemProperty(TABLE_FINISH_INFO_JSON_LENGTH_LIMIT, Integer.class);
117119
return new TableFinishOperator(
118120
context,
119121
tableFinisher,
@@ -122,7 +124,8 @@ public Operator createOperator(DriverContext driverContext)
122124
descriptor,
123125
statisticsCpuTimerEnabled,
124126
memoryTrackingEnabled,
125-
tableCommitContextCodec);
127+
tableCommitContextCodec,
128+
jsonLengthLimit);
126129
}
127130

128131
@Override
@@ -156,6 +159,7 @@ private enum State
156159
private final TableFinisher tableFinisher;
157160
private final Operator statisticsAggregationOperator;
158161
private final StatisticAggregationsDescriptor<Integer> descriptor;
162+
private final int jsonLengthLimit;
159163

160164
private State state = State.RUNNING;
161165
private final AtomicReference<Optional<ConnectorOutputMetadata>> outputMetadata = new AtomicReference<>(Optional.empty());
@@ -181,18 +185,20 @@ public TableFinishOperator(
181185
StatisticAggregationsDescriptor<Integer> descriptor,
182186
boolean statisticsCpuTimerEnabled,
183187
boolean memoryTrackingEnabled,
184-
JsonCodec<TableCommitContext> tableCommitContextCodec)
188+
JsonCodec<TableCommitContext> tableCommitContextCodec,
189+
int jsonLengthLimit)
185190
{
186191
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
187192
this.tableFinisher = requireNonNull(tableFinisher, "tableCommitter is null");
188193
this.statisticsAggregationOperator = requireNonNull(statisticsAggregationOperator, "statisticsAggregationOperator is null");
189194
this.descriptor = requireNonNull(descriptor, "descriptor is null");
195+
this.jsonLengthLimit = jsonLengthLimit;
190196
this.statisticsCpuTimerEnabled = statisticsCpuTimerEnabled;
191197
this.memoryTrackingEnabled = memoryTrackingEnabled;
192198
this.tableCommitContextCodec = requireNonNull(tableCommitContextCodec, "tableCommitContextCodec is null");
193199
this.lifespanAndStageStateTracker = new LifespanAndStageStateTracker(pageSinkCommitter, operatorRetainedMemoryBytes);
194200
this.systemMemoryContext = operatorContext.localSystemMemoryContext();
195-
this.tableFinishInfoSupplier = createTableFinishInfoSupplier(outputMetadata, statisticsTiming);
201+
this.tableFinishInfoSupplier = createTableFinishInfoSupplier(outputMetadata, statisticsTiming, jsonLengthLimit);
196202
operatorContext.setInfoSupplier(tableFinishInfoSupplier);
197203
}
198204

@@ -322,14 +328,15 @@ TableFinishInfo getInfo()
322328
return tableFinishInfoSupplier.get();
323329
}
324330

325-
private static Supplier<TableFinishInfo> createTableFinishInfoSupplier(AtomicReference<Optional<ConnectorOutputMetadata>> outputMetadata, OperationTiming statisticsTiming)
331+
private static Supplier<TableFinishInfo> createTableFinishInfoSupplier(AtomicReference<Optional<ConnectorOutputMetadata>> outputMetadata, OperationTiming statisticsTiming, int jsonLengthLimit)
326332
{
327333
requireNonNull(outputMetadata, "outputMetadata is null");
328334
requireNonNull(statisticsTiming, "statisticsTiming is null");
329335
return () -> new TableFinishInfo(
330336
outputMetadata.get(),
331337
succinctNanos(statisticsTiming.getWallNanos()),
332-
succinctNanos(statisticsTiming.getCpuNanos()));
338+
succinctNanos(statisticsTiming.getCpuNanos()),
339+
jsonLengthLimit);
333340
}
334341

335342
@Override

presto-main-base/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@
123123
import com.facebook.presto.operator.PagesIndex;
124124
import com.facebook.presto.operator.SourceOperatorFactory;
125125
import com.facebook.presto.operator.TableCommitContext;
126+
import com.facebook.presto.operator.TableFinishConfig;
126127
import com.facebook.presto.operator.TaskContext;
127128
import com.facebook.presto.operator.index.IndexJoinLookupStats;
128129
import com.facebook.presto.server.NodeStatusNotificationManager;
@@ -454,7 +455,8 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
454455
new NodeSpillConfig(),
455456
new TracingConfig(),
456457
new CompilerConfig(),
457-
new HistoryBasedOptimizationConfig()).getSessionProperties(),
458+
new HistoryBasedOptimizationConfig(),
459+
new TableFinishConfig()).getSessionProperties(),
458460
new JavaFeaturesConfig(),
459461
nodeSpillConfig),
460462
new SchemaPropertyManager(),

presto-main-base/src/test/java/com/facebook/presto/execution/TestCreateMaterializedViewTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.facebook.presto.metadata.FunctionAndTypeManager;
3333
import com.facebook.presto.metadata.SessionPropertyManager;
3434
import com.facebook.presto.metadata.TablePropertyManager;
35+
import com.facebook.presto.operator.TableFinishConfig;
3536
import com.facebook.presto.spi.ColumnHandle;
3637
import com.facebook.presto.spi.ColumnMetadata;
3738
import com.facebook.presto.spi.ConnectorId;
@@ -358,7 +359,8 @@ private static SessionPropertyManager createSessionPropertyManager()
358359
new NodeSpillConfig(),
359360
new TracingConfig(),
360361
new CompilerConfig(),
361-
new HistoryBasedOptimizationConfig()).getSessionProperties(),
362+
new HistoryBasedOptimizationConfig(),
363+
new TableFinishConfig()).getSessionProperties(),
362364
featuresConfig,
363365
new JavaFeaturesConfig(),
364366
new NodeSpillConfig());

presto-main-base/src/test/java/com/facebook/presto/execution/scheduler/TestAdaptivePhasedExecutionPolicy.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.facebook.presto.failureDetector.NoOpFailureDetector;
2929
import com.facebook.presto.memory.MemoryManagerConfig;
3030
import com.facebook.presto.memory.NodeMemoryConfig;
31+
import com.facebook.presto.operator.TableFinishConfig;
3132
import com.facebook.presto.spi.ConnectorId;
3233
import com.facebook.presto.spi.QueryId;
3334
import com.facebook.presto.spi.TableHandle;
@@ -106,7 +107,8 @@ public void testCreateExecutionSchedule()
106107
new NodeSpillConfig(),
107108
new TracingConfig(),
108109
new CompilerConfig(),
109-
new HistoryBasedOptimizationConfig()))).build();
110+
new HistoryBasedOptimizationConfig(),
111+
new TableFinishConfig()))).build();
110112
AdaptivePhasedExecutionPolicy policy = new AdaptivePhasedExecutionPolicy();
111113
Collection<StageExecutionAndScheduler> schedulers = getStageExecutionAndSchedulers(4);
112114
assertTrue(policy.createExecutionSchedule(session, schedulers) instanceof AllAtOnceExecutionSchedule);
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.operator;
15+
16+
import com.google.common.collect.ImmutableMap;
17+
import org.testng.annotations.Test;
18+
19+
import java.util.Map;
20+
21+
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
22+
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
23+
import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults;
24+
25+
public class TestTableFinishConfig
26+
{
27+
@Test
28+
public void testDefaults()
29+
{
30+
assertRecordedDefaults(recordDefaults(TableFinishConfig.class)
31+
.setTableFinishInfoJsonLengthLimit(10_485_760));
32+
}
33+
34+
@Test
35+
public void testExplicitPropertyMappings()
36+
{
37+
Map<String, String> properties = new ImmutableMap.Builder<String, String>()
38+
.put("table-finish-info-json-length-limit", "5000000")
39+
.build();
40+
41+
TableFinishConfig expected = new TableFinishConfig()
42+
.setTableFinishInfoJsonLengthLimit(5_000_000);
43+
44+
assertFullMapping(properties, expected);
45+
}
46+
}

presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestAnalyzer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.facebook.presto.execution.warnings.WarningCollectorConfig;
2323
import com.facebook.presto.memory.MemoryManagerConfig;
2424
import com.facebook.presto.memory.NodeMemoryConfig;
25+
import com.facebook.presto.operator.TableFinishConfig;
2526
import com.facebook.presto.spi.PrestoWarning;
2627
import com.facebook.presto.spi.StandardWarningCode;
2728
import com.facebook.presto.spi.WarningCollector;
@@ -244,7 +245,8 @@ public void testWindowOrderByAnalysis()
244245
new NodeSpillConfig(),
245246
new TracingConfig(),
246247
new CompilerConfig(),
247-
new HistoryBasedOptimizationConfig()))).build();
248+
new HistoryBasedOptimizationConfig(),
249+
new TableFinishConfig()))).build();
248250
assertFails(session, WINDOW_FUNCTION_ORDERBY_LITERAL,
249251
"SELECT SUM(x) OVER (PARTITION BY y ORDER BY 1) AS s\n" +
250252
"FROM (values (1,10), (2, 10)) AS T(x, y)");
@@ -654,7 +656,8 @@ public void testTooManyGroupingElements()
654656
new NodeSpillConfig(),
655657
new TracingConfig(),
656658
new CompilerConfig(),
657-
new HistoryBasedOptimizationConfig()))).build();
659+
new HistoryBasedOptimizationConfig(),
660+
new TableFinishConfig()))).build();
658661
analyze(session, "SELECT a, b, c, d, e, f, g, h, i, j, k, SUM(l)" +
659662
"FROM (VALUES (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12))\n" +
660663
"t (a, b, c, d, e, f, g, h, i, j, k, l)\n" +

presto-main-base/src/test/java/com/facebook/presto/sql/planner/sanity/TestValidateStreamingJoins.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.facebook.presto.memory.MemoryManagerConfig;
2525
import com.facebook.presto.memory.NodeMemoryConfig;
2626
import com.facebook.presto.metadata.Metadata;
27+
import com.facebook.presto.operator.TableFinishConfig;
2728
import com.facebook.presto.spi.ColumnHandle;
2829
import com.facebook.presto.spi.ConnectorId;
2930
import com.facebook.presto.spi.TableHandle;
@@ -89,7 +90,8 @@ public void setup()
8990
new NodeSpillConfig(),
9091
new TracingConfig(),
9192
new CompilerConfig(),
92-
new HistoryBasedOptimizationConfig())))
93+
new HistoryBasedOptimizationConfig(),
94+
new TableFinishConfig())))
9395
.setCatalog("local")
9496
.setSchema("tiny")
9597
.setSystemProperty("spill_enabled", "true")

0 commit comments

Comments
 (0)