Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties-session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,14 @@ The corresponding configuration property is :ref:`admin/properties:\`\`experimen
.. warning::

Materialized views are experimental. The SPI and behavior may change in future releases.

``table_finish_info_json_length_limit``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``int``
* **Default value:** ``10,000,000``

The maximum length of the JSON-serialized ConnectorOutputMetadata string in
TableFinishInfo. If the length is exceeded, then the Metadata is omitted.

When set to a non-positive value, the length limit is not enforced.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.facebook.presto.execution.warnings.WarningCollectorConfig;
import com.facebook.presto.memory.MemoryManagerConfig;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.operator.TableFinishConfig;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.eventlistener.CTEInformation;
import com.facebook.presto.spi.security.ViewSecurity;
Expand Down Expand Up @@ -133,6 +134,7 @@ public final class SystemSessionProperties
public static final String SCALE_WRITERS = "scale_writers";
public static final String WRITER_MIN_SIZE = "writer_min_size";
public static final String OPTIMIZED_SCALE_WRITER_PRODUCER_BUFFER = "optimized_scale_writer_producer_buffer";
public static final String TABLE_FINISH_INFO_JSON_LENGTH_LIMIT = "table_finish_info_json_length_limit";
public static final String PUSH_TABLE_WRITE_THROUGH_UNION = "push_table_write_through_union";
public static final String EXECUTION_POLICY = "execution_policy";
public static final String DICTIONARY_AGGREGATION = "dictionary_aggregation";
Expand Down Expand Up @@ -382,7 +384,8 @@ public SystemSessionProperties()
new NodeSpillConfig(),
new TracingConfig(),
new CompilerConfig(),
new HistoryBasedOptimizationConfig());
new HistoryBasedOptimizationConfig(),
new TableFinishConfig());
}

@Inject
Expand All @@ -398,7 +401,8 @@ public SystemSessionProperties(
NodeSpillConfig nodeSpillConfig,
TracingConfig tracingConfig,
CompilerConfig compilerConfig,
HistoryBasedOptimizationConfig historyBasedOptimizationConfig)
HistoryBasedOptimizationConfig historyBasedOptimizationConfig,
TableFinishConfig tableFinishConfig)
{
sessionProperties = ImmutableList.of(
integerProperty(
Expand Down Expand Up @@ -560,6 +564,11 @@ public SystemSessionProperties(
"Optimize scale writer creation based on producer buffer",
featuresConfig.isOptimizedScaleWriterProducerBuffer(),
true),
integerProperty(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider using dataSizeProperty, like QUERY_MAX_WRITTEN_INTERMEDIATE_BYTES ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debated doing this! I think integer is actually better:

  1. Strictly speaking, we pass this down NFO_CODEC.toJsonWithLengthLimit(metadataInfo, jsonLengthLimit);. toJsonWithLengthLimit checks the number of characters, not the number of bytes. That is, integer is technically "more correct"

  2. I want to support disabling the feature when passing a non-positive value. DataSize requires you specify units (e.g. 0MB, 0GB), which is awkward if you just want to disable the feature.

TABLE_FINISH_INFO_JSON_LENGTH_LIMIT,
"Maximum number of characters in connector output metadata JSON in table finish info",
tableFinishConfig.getTableFinishInfoJsonLengthLimit(),
false),
booleanProperty(
PUSH_TABLE_WRITE_THROUGH_UNION,
"Parallelize writes when using UNION ALL in queries that write data",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;

import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import jakarta.validation.constraints.NotNull;

public class TableFinishConfig
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a better config file to put this config in, so I made my own. I'm open to suggestions on where else to put this!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will say, either FeaturesConfig, or make the name of this config general to incorporate similar needs in other operators?

{
private int tableFinishInfoJsonLengthLimit = 10_000_000;

@NotNull
public int getTableFinishInfoJsonLengthLimit()
{
return tableFinishInfoJsonLengthLimit;
}

@Config("table-finish-info-json-length-limit")
@ConfigDescription("Maximum number of characters in connector output metadata JSON in table finish info")
public TableFinishConfig setTableFinishInfoJsonLengthLimit(int tableFinishInfoJsonLengthLimit)
{
this.tableFinishInfoJsonLengthLimit = tableFinishInfoJsonLengthLimit;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.facebook.presto.operator;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.units.DataSize;
import com.facebook.airlift.units.Duration;
import com.facebook.drift.annotations.ThriftConstructor;
import com.facebook.drift.annotations.ThriftField;
Expand All @@ -26,37 +25,29 @@
import java.util.Optional;

import static com.facebook.airlift.json.JsonCodec.jsonCodec;
import static com.facebook.airlift.units.DataSize.Unit.MEGABYTE;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

@ThriftStruct
public class TableFinishInfo
implements OperatorInfo
{
private static final int JSON_LENGTH_LIMIT = toIntExact(new DataSize(10, MEGABYTE).toBytes());
private static final JsonCodec<Object> INFO_CODEC = jsonCodec(Object.class);

private final String serializedConnectorOutputMetadata;
private final boolean jsonLengthLimitExceeded;
private final Duration statisticsWallTime;
private final Duration statisticsCpuTime;

public TableFinishInfo(Optional<ConnectorOutputMetadata> metadata, Duration statisticsWallTime, Duration statisticsCpuTime)
public TableFinishInfo(Optional<ConnectorOutputMetadata> metadata, Duration statisticsWallTime, Duration statisticsCpuTime, int jsonLengthLimit)
{
String serializedConnectorOutputMetadata = null;
boolean jsonLengthLimitExceeded = false;
if (metadata.isPresent()) {
Optional<String> serializedMetadata = INFO_CODEC.toJsonWithLengthLimit(metadata.get().getInfo(), JSON_LENGTH_LIMIT);
if (!serializedMetadata.isPresent()) {
serializedConnectorOutputMetadata = null;
jsonLengthLimitExceeded = true;
}
else {
serializedConnectorOutputMetadata = serializedMetadata.get();
jsonLengthLimitExceeded = false;
}
Optional<String> serializedMetadata = getSerializedMetadataWithLimit(metadata.get(), jsonLengthLimit);
serializedConnectorOutputMetadata = serializedMetadata.orElse(null);
jsonLengthLimitExceeded = !serializedMetadata.isPresent();
}

this.serializedConnectorOutputMetadata = serializedConnectorOutputMetadata;
this.jsonLengthLimitExceeded = jsonLengthLimitExceeded;
this.statisticsWallTime = requireNonNull(statisticsWallTime, "statisticsWallTime is null");
Expand Down Expand Up @@ -110,4 +101,13 @@ public boolean isFinal()
{
return true;
}

private static Optional<String> getSerializedMetadataWithLimit(ConnectorOutputMetadata metadata, int jsonLengthLimit)
{
Object metadataInfo = metadata.getInfo();
if (jsonLengthLimit <= 0) {
return Optional.of(INFO_CODEC.toJson(metadataInfo));
}
return INFO_CODEC.toJsonWithLengthLimit(metadataInfo, jsonLengthLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.function.Supplier;

import static com.facebook.airlift.units.Duration.succinctNanos;
import static com.facebook.presto.SystemSessionProperties.TABLE_FINISH_INFO_JSON_LENGTH_LIMIT;
import static com.facebook.presto.SystemSessionProperties.isStatisticsCpuTimerEnabled;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
Expand Down Expand Up @@ -114,6 +115,7 @@ public Operator createOperator(DriverContext driverContext)
OperatorContext context = driverContext.addOperatorContext(operatorId, planNodeId, TableFinishOperator.class.getSimpleName());
Operator statisticsAggregationOperator = statisticsAggregationOperatorFactory.createOperator(driverContext);
boolean statisticsCpuTimerEnabled = !(statisticsAggregationOperator instanceof DevNullOperator) && isStatisticsCpuTimerEnabled(session);
int jsonLengthLimit = session.getSystemProperty(TABLE_FINISH_INFO_JSON_LENGTH_LIMIT, Integer.class);
return new TableFinishOperator(
context,
tableFinisher,
Expand All @@ -122,7 +124,8 @@ public Operator createOperator(DriverContext driverContext)
descriptor,
statisticsCpuTimerEnabled,
memoryTrackingEnabled,
tableCommitContextCodec);
tableCommitContextCodec,
jsonLengthLimit);
}

@Override
Expand Down Expand Up @@ -156,6 +159,7 @@ private enum State
private final TableFinisher tableFinisher;
private final Operator statisticsAggregationOperator;
private final StatisticAggregationsDescriptor<Integer> descriptor;
private final int jsonLengthLimit;

private State state = State.RUNNING;
private final AtomicReference<Optional<ConnectorOutputMetadata>> outputMetadata = new AtomicReference<>(Optional.empty());
Expand All @@ -181,18 +185,20 @@ public TableFinishOperator(
StatisticAggregationsDescriptor<Integer> descriptor,
boolean statisticsCpuTimerEnabled,
boolean memoryTrackingEnabled,
JsonCodec<TableCommitContext> tableCommitContextCodec)
JsonCodec<TableCommitContext> tableCommitContextCodec,
int jsonLengthLimit)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.tableFinisher = requireNonNull(tableFinisher, "tableCommitter is null");
this.statisticsAggregationOperator = requireNonNull(statisticsAggregationOperator, "statisticsAggregationOperator is null");
this.descriptor = requireNonNull(descriptor, "descriptor is null");
this.jsonLengthLimit = jsonLengthLimit;
this.statisticsCpuTimerEnabled = statisticsCpuTimerEnabled;
this.memoryTrackingEnabled = memoryTrackingEnabled;
this.tableCommitContextCodec = requireNonNull(tableCommitContextCodec, "tableCommitContextCodec is null");
this.lifespanAndStageStateTracker = new LifespanAndStageStateTracker(pageSinkCommitter, operatorRetainedMemoryBytes);
this.systemMemoryContext = operatorContext.localSystemMemoryContext();
this.tableFinishInfoSupplier = createTableFinishInfoSupplier(outputMetadata, statisticsTiming);
this.tableFinishInfoSupplier = createTableFinishInfoSupplier(outputMetadata, statisticsTiming, jsonLengthLimit);
operatorContext.setInfoSupplier(tableFinishInfoSupplier);
}

Expand Down Expand Up @@ -322,14 +328,15 @@ TableFinishInfo getInfo()
return tableFinishInfoSupplier.get();
}

private static Supplier<TableFinishInfo> createTableFinishInfoSupplier(AtomicReference<Optional<ConnectorOutputMetadata>> outputMetadata, OperationTiming statisticsTiming)
private static Supplier<TableFinishInfo> createTableFinishInfoSupplier(AtomicReference<Optional<ConnectorOutputMetadata>> outputMetadata, OperationTiming statisticsTiming, int jsonLengthLimit)
{
requireNonNull(outputMetadata, "outputMetadata is null");
requireNonNull(statisticsTiming, "statisticsTiming is null");
return () -> new TableFinishInfo(
outputMetadata.get(),
succinctNanos(statisticsTiming.getWallNanos()),
succinctNanos(statisticsTiming.getCpuNanos()));
succinctNanos(statisticsTiming.getCpuNanos()),
jsonLengthLimit);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import com.facebook.presto.operator.PagesIndex;
import com.facebook.presto.operator.SourceOperatorFactory;
import com.facebook.presto.operator.TableCommitContext;
import com.facebook.presto.operator.TableFinishConfig;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.operator.index.IndexJoinLookupStats;
import com.facebook.presto.server.NodeStatusNotificationManager;
Expand Down Expand Up @@ -454,7 +455,8 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
new NodeSpillConfig(),
new TracingConfig(),
new CompilerConfig(),
new HistoryBasedOptimizationConfig()).getSessionProperties(),
new HistoryBasedOptimizationConfig(),
new TableFinishConfig()).getSessionProperties(),
new JavaFeaturesConfig(),
nodeSpillConfig),
new SchemaPropertyManager(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.metadata.TablePropertyManager;
import com.facebook.presto.operator.TableFinishConfig;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorId;
Expand Down Expand Up @@ -358,7 +359,8 @@ private static SessionPropertyManager createSessionPropertyManager()
new NodeSpillConfig(),
new TracingConfig(),
new CompilerConfig(),
new HistoryBasedOptimizationConfig()).getSessionProperties(),
new HistoryBasedOptimizationConfig(),
new TableFinishConfig()).getSessionProperties(),
featuresConfig,
new JavaFeaturesConfig(),
new NodeSpillConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.facebook.presto.failureDetector.NoOpFailureDetector;
import com.facebook.presto.memory.MemoryManagerConfig;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.operator.TableFinishConfig;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.TableHandle;
Expand Down Expand Up @@ -106,7 +107,8 @@ public void testCreateExecutionSchedule()
new NodeSpillConfig(),
new TracingConfig(),
new CompilerConfig(),
new HistoryBasedOptimizationConfig()))).build();
new HistoryBasedOptimizationConfig(),
new TableFinishConfig()))).build();
AdaptivePhasedExecutionPolicy policy = new AdaptivePhasedExecutionPolicy();
Collection<StageExecutionAndScheduler> schedulers = getStageExecutionAndSchedulers(4);
assertTrue(policy.createExecutionSchedule(session, schedulers) instanceof AllAtOnceExecutionSchedule);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;

import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

import java.util.Map;

import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults;

public class TestTableFinishConfig
{
@Test
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(TableFinishConfig.class)
.setTableFinishInfoJsonLengthLimit(10_000_000));
}

@Test
public void testExplicitPropertyMappings()
{
Map<String, String> properties = new ImmutableMap.Builder<String, String>()
.put("table-finish-info-json-length-limit", "5000000")
.build();

TableFinishConfig expected = new TableFinishConfig()
.setTableFinishInfoJsonLengthLimit(5_000_000);

assertFullMapping(properties, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.facebook.presto.execution.warnings.WarningCollectorConfig;
import com.facebook.presto.memory.MemoryManagerConfig;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.operator.TableFinishConfig;
import com.facebook.presto.spi.PrestoWarning;
import com.facebook.presto.spi.StandardWarningCode;
import com.facebook.presto.spi.WarningCollector;
Expand Down Expand Up @@ -244,7 +245,8 @@ public void testWindowOrderByAnalysis()
new NodeSpillConfig(),
new TracingConfig(),
new CompilerConfig(),
new HistoryBasedOptimizationConfig()))).build();
new HistoryBasedOptimizationConfig(),
new TableFinishConfig()))).build();
assertFails(session, WINDOW_FUNCTION_ORDERBY_LITERAL,
"SELECT SUM(x) OVER (PARTITION BY y ORDER BY 1) AS s\n" +
"FROM (values (1,10), (2, 10)) AS T(x, y)");
Expand Down Expand Up @@ -654,7 +656,8 @@ public void testTooManyGroupingElements()
new NodeSpillConfig(),
new TracingConfig(),
new CompilerConfig(),
new HistoryBasedOptimizationConfig()))).build();
new HistoryBasedOptimizationConfig(),
new TableFinishConfig()))).build();
analyze(session, "SELECT a, b, c, d, e, f, g, h, i, j, k, SUM(l)" +
"FROM (VALUES (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12))\n" +
"t (a, b, c, d, e, f, g, h, i, j, k, l)\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.memory.MemoryManagerConfig;
import com.facebook.presto.memory.NodeMemoryConfig;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.operator.TableFinishConfig;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.TableHandle;
Expand Down Expand Up @@ -89,7 +90,8 @@ public void setup()
new NodeSpillConfig(),
new TracingConfig(),
new CompilerConfig(),
new HistoryBasedOptimizationConfig())))
new HistoryBasedOptimizationConfig(),
new TableFinishConfig())))
.setCatalog("local")
.setSchema("tiny")
.setSystemProperty("spill_enabled", "true")
Expand Down
Loading
Loading