Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@
<module>presto-hudi</module>
<module>presto-native-execution</module>
<module>presto-native-tests</module>
<module>presto-native-tvf</module>
<module>presto-router</module>
<module>presto-open-telemetry</module>
<module>redis-hbo-provider</module>
Expand Down Expand Up @@ -1191,6 +1192,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-native-tvf</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.hive</groupId>
<artifactId>hive-dwrf</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,6 @@ public Field(Optional<NodeLocation> nodeLocation, Optional<QualifiedName> relati
this.aliased = aliased;
}

public static Field newUnqualified(Optional<String> name, Type type)
{
requireNonNull(name, "name is null");
requireNonNull(type, "type is null");

return new Field(Optional.empty(), Optional.empty(), name, type, false, Optional.empty(), Optional.empty(), false);
}

public Optional<NodeLocation> getNodeLocation()
{
return nodeLocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
import static com.facebook.presto.spi.function.table.ReturnTypeSpecification.GenericTable.GENERIC_TABLE;
import static com.facebook.presto.spi.function.table.GenericTableReturnTypeSpecification.GENERIC_TABLE;
import static java.util.Objects.requireNonNull;

public class QueryFunctionProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.facebook.presto.common.block.BlockBuilderStatus;
import com.facebook.presto.common.block.RowBlockBuilder;
import com.facebook.presto.common.function.SqlFunctionProperties;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -41,13 +43,26 @@ public class RowType
private final Optional<TypeSignature> typeSignature;

private RowType(List<Field> fields)
{
this(fields, fields.stream()
.map(Field::getType)
.collect(toList()));
}

private RowType(List<Field> fields, List<Type> fieldTypes)
{
this(fields, fieldTypes, containsDistinctType(fieldTypes) ? Optional.empty() : Optional.of(makeSignature(fields)));
}

@JsonCreator
public RowType(List<Field> fields,
List<Type> fieldTypes,
@JsonProperty("typeSignature") Optional<TypeSignature> typeSignature)
{
super(Block.class);
this.fields = fields;
this.fieldTypes = fields.stream()
.map(Field::getType)
.collect(toList());
this.typeSignature = containsDistinctType(this.fieldTypes) ? Optional.empty() : Optional.of(makeSignature(fields));
this.fieldTypes = fieldTypes;
this.typeSignature = typeSignature;
}

public static RowType from(List<Field> fields)
Expand Down Expand Up @@ -120,6 +135,12 @@ public BlockBuilder createBlockBuilder(BlockBuilderStatus blockBuilderStatus, in
return new RowBlockBuilder(getTypeParameters(), blockBuilderStatus, expectedEntries);
}

@JsonProperty
public List<Field> getFields()
{
return fields;
}

@Override
public String getDisplayName()
{
Expand Down Expand Up @@ -193,18 +214,13 @@ public List<Type> getTypeParameters()
return fieldTypes;
}

public List<Field> getFields()
{
return fields;
}

public static class Field
{
private final Type type;
private final Optional<String> name;
private final boolean delimited;

public Field(Optional<String> name, Type type)
public Field(@JsonProperty("name") Optional<String> name, @JsonProperty("type") Type type)
{
this(name, type, false);
}
Expand All @@ -216,11 +232,13 @@ public Field(Optional<String> name, Type type, boolean delimited)
this.delimited = delimited;
}

@JsonProperty
public Type getType()
{
return type;
}

@JsonProperty
public Optional<String> getName()
{
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
import com.facebook.presto.spi.function.table.TableFunctionProcessorProvider;
import com.facebook.presto.spi.procedure.BaseProcedure;
import com.facebook.presto.spi.procedure.DistributedProcedure;
import com.facebook.presto.spi.procedure.Procedure;
Expand Down Expand Up @@ -86,6 +88,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static com.facebook.presto.metadata.FunctionExtractor.extractFunctions;
import static com.facebook.presto.spi.ConnectorId.createInformationSchemaConnectorId;
Expand Down Expand Up @@ -215,6 +218,12 @@ public synchronized void addConnectorFactory(ConnectorFactory connectorFactory)
ConnectorFactory existingConnectorFactory = connectorFactories.putIfAbsent(connectorFactory.getName(), connectorFactory);
checkArgument(existingConnectorFactory == null, "Connector %s is already registered", connectorFactory.getName());
handleResolver.addConnectorName(connectorFactory.getName(), connectorFactory.getHandleResolver());
connectorFactory.getTableFunctionHandleResolver().ifPresent(resolver -> {
handleResolver.addTableFunctionNamespace(connectorFactory.getName(), resolver);
});
connectorFactory.getTableFunctionSplitResolver().ifPresent(resolver -> {
handleResolver.addTableFunctionSplitNamespace(connectorFactory.getName(), resolver);
});
}

public synchronized ConnectorId createConnection(String catalogName, String connectorName, Map<String, String> properties)
Expand Down Expand Up @@ -334,6 +343,7 @@ private synchronized void addConnectorInternal(MaterializedConnector connector)
metadataManager.getAnalyzePropertyManager().addProperties(connectorId, connector.getAnalyzeProperties());
metadataManager.getSessionPropertyManager().addConnectorSessionProperties(connectorId, connector.getSessionProperties());
metadataManager.getFunctionAndTypeManager().getTableFunctionRegistry().addTableFunctions(connectorId, connector.getTableFunctions());
metadataManager.getFunctionAndTypeManager().addTableFunctionProcessorProvider(connectorId, connector.getTableFunctionProcessorProvider());
}

public synchronized void dropConnection(String catalogName)
Expand All @@ -346,6 +356,7 @@ public synchronized void dropConnection(String catalogName)
removeConnectorInternal(createInformationSchemaConnectorId(connectorId));
removeConnectorInternal(createSystemTablesConnectorId(connectorId));
metadataManager.getFunctionAndTypeManager().getTableFunctionRegistry().removeTableFunctions(connectorId);
metadataManager.getFunctionAndTypeManager().removeTableFunctionProcessorProvider(connectorId);
});
}

Expand Down Expand Up @@ -422,6 +433,7 @@ private static class MaterializedConnector

private final Set<Class<?>> functions;
private final Set<ConnectorTableFunction> connectorTableFunctions;
private final Function<ConnectorTableFunctionHandle, TableFunctionProcessorProvider> connectorTableFunctionProcessorProvider;
private final ConnectorPageSourceProvider pageSourceProvider;
private final Optional<ConnectorPageSinkProvider> pageSinkProvider;
private final Optional<ConnectorIndexProvider> indexProvider;
Expand Down Expand Up @@ -459,6 +471,7 @@ public MaterializedConnector(ConnectorId connectorId, Connector connector)
Set<ConnectorTableFunction> connectorTableFunctions = connector.getTableFunctions();
requireNonNull(connectorTableFunctions, format("Connector '%s' returned a null table functions set", connectorId));
this.connectorTableFunctions = ImmutableSet.copyOf(connectorTableFunctions);
this.connectorTableFunctionProcessorProvider = connector.getTableFunctionProcessorProvider();

ConnectorPageSourceProvider connectorPageSourceProvider = null;
try {
Expand Down Expand Up @@ -660,5 +673,10 @@ public Set<ConnectorTableFunction> getTableFunctions()
{
return connectorTableFunctions;
}

public Function<ConnectorTableFunctionHandle, TableFunctionProcessorProvider> getTableFunctionProcessorProvider()
{
return connectorTableFunctionProcessorProvider;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,22 @@

import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.transaction.TransactionId;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.operator.table.ExcludeColumns;
import com.facebook.presto.operator.table.Sequence;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayout;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.ConnectorTableLayoutResult;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.SplitContext;
Expand All @@ -34,6 +39,9 @@
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
import com.facebook.presto.spi.function.table.TableFunctionProcessorProvider;
import com.facebook.presto.spi.procedure.Procedure;
import com.facebook.presto.spi.transaction.IsolationLevel;
import com.facebook.presto.transaction.InternalConnector;
Expand All @@ -45,6 +53,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;

import static java.util.Objects.requireNonNull;

Expand All @@ -56,12 +65,18 @@ public class GlobalSystemConnector
private final String connectorId;
private final Set<SystemTable> systemTables;
private final Set<Procedure> procedures;
private final Set<ConnectorTableFunction> tableFunctions;
private final NodeManager nodeManager;
private final FunctionAndTypeManager functionAndTypeManager;

public GlobalSystemConnector(String connectorId, Set<SystemTable> systemTables, Set<Procedure> procedures)
public GlobalSystemConnector(String connectorId, Set<SystemTable> systemTables, Set<Procedure> procedures, Set<ConnectorTableFunction> tableFunctions, NodeManager nodeManager, FunctionAndTypeManager functionAndTypeManager)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.systemTables = ImmutableSet.copyOf(requireNonNull(systemTables, "systemTables is null"));
this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null"));
this.tableFunctions = ImmutableSet.copyOf(requireNonNull(tableFunctions, "tableFunctions is null"));
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionAndTypeManager is null");
}

@Override
Expand Down Expand Up @@ -138,8 +153,18 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
@Override
public ConnectorSplitManager getSplitManager()
{
return (transactionHandle, session, layout, splitSchedulingContext) -> {
throw new UnsupportedOperationException();
return new ConnectorSplitManager() {
@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout, SplitSchedulingContext splitSchedulingContext)
{
throw new UnsupportedOperationException();
}

@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableFunctionHandle function)
{
return function.getSplits(transaction, session, nodeManager, functionAndTypeManager);
}
};
}

Expand All @@ -166,4 +191,24 @@ public Set<Procedure> getProcedures()
{
return procedures;
}

@Override
public Set<ConnectorTableFunction> getTableFunctions()
{
return tableFunctions;
}

@Override
public Function<ConnectorTableFunctionHandle, TableFunctionProcessorProvider> getTableFunctionProcessorProvider()
{
return connectorTableFunctionHandle -> {
if (connectorTableFunctionHandle instanceof ExcludeColumns.ExcludeColumnsFunctionHandle) {
return ExcludeColumns.getExcludeColumnsFunctionProcessorProvider();
}
else if (connectorTableFunctionHandle instanceof Sequence.SequenceFunctionHandle) {
return Sequence.getSequenceFunctionProcessorProvider();
}
return null;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
*/
package com.facebook.presto.connector.system;

import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorContext;
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
import com.facebook.presto.spi.procedure.Procedure;
import com.google.common.collect.ImmutableSet;
import jakarta.inject.Inject;
Expand All @@ -32,12 +35,18 @@ public class GlobalSystemConnectorFactory
{
private final Set<SystemTable> tables;
private final Set<Procedure> procedures;
private final Set<ConnectorTableFunction> tableFunctions;
private final NodeManager nodeManager;
private final FunctionAndTypeManager functionAndTypeManager;

@Inject
public GlobalSystemConnectorFactory(Set<SystemTable> tables, Set<Procedure> procedures)
public GlobalSystemConnectorFactory(Set<SystemTable> tables, Set<Procedure> procedures, Set<ConnectorTableFunction> tableFunctions, NodeManager nodeManager, FunctionAndTypeManager functionAndTypeManager)
{
this.tables = ImmutableSet.copyOf(requireNonNull(tables, "tables is null"));
this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null"));
this.tableFunctions = ImmutableSet.copyOf(requireNonNull(tableFunctions, "tableFunctions is null"));
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionAndTypeManager is null");
}

@Override
Expand All @@ -55,6 +64,6 @@ public ConnectorHandleResolver getHandleResolver()
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
return new GlobalSystemConnector(catalogName, tables, procedures);
return new GlobalSystemConnector(catalogName, tables, procedures, tableFunctions, nodeManager, functionAndTypeManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@
import com.facebook.presto.connector.system.jdbc.TableTypeJdbcTable;
import com.facebook.presto.connector.system.jdbc.TypesJdbcTable;
import com.facebook.presto.connector.system.jdbc.UdtJdbcTable;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.nodeManager.PluginNodeManager;
import com.facebook.presto.operator.table.ExcludeColumns;
import com.facebook.presto.operator.table.Sequence;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
import com.facebook.presto.spi.procedure.Procedure;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
Expand Down Expand Up @@ -77,6 +83,13 @@ public void configure(Binder binder)

binder.bind(GlobalSystemConnectorFactory.class).in(Scopes.SINGLETON);
binder.bind(SystemConnectorRegistrar.class).asEagerSingleton();
binder.bind(PluginNodeManager.class).in(Scopes.SINGLETON);
binder.bind(NodeManager.class).to(PluginNodeManager.class).in(Scopes.SINGLETON);
binder.bind(FunctionAndTypeManager.class).in(Scopes.SINGLETON);

Multibinder<ConnectorTableFunction> tableFunctions = Multibinder.newSetBinder(binder, ConnectorTableFunction.class);
tableFunctions.addBinding().toProvider(ExcludeColumns.class).in(Scopes.SINGLETON);
tableFunctions.addBinding().toProvider(Sequence.class).in(Scopes.SINGLETON);
}

@ProvidesIntoSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ private PlanRoot runCreateLogicalPlanAsync()

private void createQueryScheduler(PlanRoot plan)
{
CloseableSplitSourceProvider splitSourceProvider = new CloseableSplitSourceProvider(splitManager::getSplits);
CloseableSplitSourceProvider splitSourceProvider = new CloseableSplitSourceProvider(splitManager);

// ensure split sources are closed
stateMachine.addStateChangeListener(state -> {
Expand Down
Loading
Loading