catalogFunctions = tableFunctions.get(connectorId);
+ if (catalogFunctions != null) {
+ String lowercasedSchemaName = name.getSchemaFunctionName().getSchemaName().toLowerCase(ENGLISH);
+ String lowercasedFunctionName = name.getSchemaFunctionName().getFunctionName().toLowerCase(ENGLISH);
+ TableFunctionMetadata function = catalogFunctions.get(new SchemaFunctionName(lowercasedSchemaName, lowercasedFunctionName));
+ if (function != null) {
+ return function;
+ }
+ }
+
+ throw new PrestoException(GENERIC_USER_ERROR, format("Table functions for catalog %s could not be resolved.", connectorId.getCatalogName()));
}
private static void validateTableFunction(ConnectorTableFunction tableFunction)
@@ -154,8 +186,8 @@ private static void validateTableFunction(ConnectorTableFunction tableFunction)
// Such a table argument is implicitly 'prune when empty'. The TableArgumentSpecification.Builder enforces the 'prune when empty' property
// for a table argument with row semantics.
- if (tableFunction.getReturnTypeSpecification() instanceof DescribedTable) {
- DescribedTable describedTable = (DescribedTable) tableFunction.getReturnTypeSpecification();
+ if (tableFunction.getReturnTypeSpecification() instanceof DescribedTableReturnTypeSpecification) {
+ DescribedTableReturnTypeSpecification describedTable = (DescribedTableReturnTypeSpecification) tableFunction.getReturnTypeSpecification();
checkArgument(describedTable.getDescriptor().isTyped(), "field types missing in returned type specification");
}
}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/operator/EmptyTableFunctionPartition.java b/presto-main-base/src/main/java/com/facebook/presto/operator/EmptyTableFunctionPartition.java
new file mode 100644
index 0000000000000..bda83ae6319d4
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/operator/EmptyTableFunctionPartition.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.operator;
+
+import com.facebook.presto.common.Page;
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.RunLengthEncodedBlock;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.spi.function.table.TableFunctionDataProcessor;
+import com.facebook.presto.spi.function.table.TableFunctionProcessorState;
+
+import java.util.List;
+
+import static com.facebook.airlift.concurrent.MoreFutures.toListenableFuture;
+import static com.facebook.presto.spi.StandardErrorCode.FUNCTION_IMPLEMENTATION_ERROR;
+import static com.facebook.presto.spi.function.table.TableFunctionProcessorState.Finished.FINISHED;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a class representing empty input to a table function. An EmptyTableFunctionPartition is created
+ * when the table function has KEEP WHEN EMPTY property, which means that the function should be executed
+ * even if the input is empty, and all the table arguments are empty relations.
+ *
+ * An EmptyTableFunctionPartition is created and processed once per node. To avoid duplicated execution,
+ * a table function having KEEP WHEN EMPTY property must have single distribution.
+ */
+public class EmptyTableFunctionPartition
+ implements TableFunctionPartition
+{
+ private final TableFunctionDataProcessor tableFunction;
+ private final int properChannelsCount;
+ private final int passThroughSourcesCount;
+ private final Type[] passThroughTypes;
+
+ public EmptyTableFunctionPartition(TableFunctionDataProcessor tableFunction, int properChannelsCount, int passThroughSourcesCount, List passThroughTypes)
+ {
+ this.tableFunction = requireNonNull(tableFunction, "tableFunction is null");
+ this.properChannelsCount = properChannelsCount;
+ this.passThroughSourcesCount = passThroughSourcesCount;
+ this.passThroughTypes = passThroughTypes.toArray(new Type[] {});
+ }
+
+ @Override
+ public WorkProcessor toOutputPages()
+ {
+ return WorkProcessor.create(() -> {
+ TableFunctionProcessorState state = tableFunction.process(null);
+ if (state == FINISHED) {
+ return WorkProcessor.ProcessState.finished();
+ }
+ if (state instanceof TableFunctionProcessorState.Blocked) {
+ return WorkProcessor.ProcessState.blocked(toListenableFuture(((TableFunctionProcessorState.Blocked) state).getFuture()));
+ }
+ TableFunctionProcessorState.Processed processed = (TableFunctionProcessorState.Processed) state;
+ if (processed.getResult() != null) {
+ return WorkProcessor.ProcessState.ofResult(appendNullsForPassThroughColumns(processed.getResult()));
+ }
+ throw new PrestoException(FUNCTION_IMPLEMENTATION_ERROR, "When function got no input, it should either produce output or return Blocked state");
+ });
+ }
+
+ private Page appendNullsForPassThroughColumns(Page page)
+ {
+ if (page.getChannelCount() != properChannelsCount + passThroughSourcesCount) {
+ throw new PrestoException(
+ FUNCTION_IMPLEMENTATION_ERROR,
+ format(
+ "Table function returned a page containing %s channels. Expected channel number: %s (%s proper columns, %s pass-through index columns)",
+ page.getChannelCount(),
+ properChannelsCount + passThroughSourcesCount,
+ properChannelsCount,
+ passThroughSourcesCount));
+ }
+
+ Block[] resultBlocks = new Block[properChannelsCount + passThroughTypes.length];
+
+ // proper outputs first
+ for (int channel = 0; channel < properChannelsCount; channel++) {
+ resultBlocks[channel] = page.getBlock(channel);
+ }
+
+ // pass-through columns next
+ // because no input was processed, all pass-through indexes in the result page must be null (there are no input rows they could refer to).
+ // for performance reasons this is not checked. All pass-through columns are filled with nulls.
+ int channel = properChannelsCount;
+ for (Type type : passThroughTypes) {
+ resultBlocks[channel] = RunLengthEncodedBlock.create(type, null, page.getPositionCount());
+ channel++;
+ }
+
+ // pass the position count so that the Page can be successfully created in the case when there are no output channels (resultBlocks is empty)
+ return new Page(page.getPositionCount(), resultBlocks);
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/operator/LeafTableFunctionOperator.java b/presto-main-base/src/main/java/com/facebook/presto/operator/LeafTableFunctionOperator.java
new file mode 100644
index 0000000000000..3eb272cef09ec
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/operator/LeafTableFunctionOperator.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.operator;
+
+import com.facebook.presto.common.Page;
+import com.facebook.presto.execution.ScheduledSplit;
+import com.facebook.presto.metadata.Split;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.UpdatablePageSource;
+import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
+import com.facebook.presto.spi.function.table.TableFunctionProcessorProvider;
+import com.facebook.presto.spi.function.table.TableFunctionProcessorState;
+import com.facebook.presto.spi.function.table.TableFunctionProcessorState.Blocked;
+import com.facebook.presto.spi.function.table.TableFunctionProcessorState.Processed;
+import com.facebook.presto.spi.function.table.TableFunctionSplitProcessor;
+import com.facebook.presto.spi.plan.PlanNodeId;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static com.facebook.airlift.concurrent.MoreFutures.toListenableFuture;
+import static com.facebook.presto.spi.function.table.TableFunctionProcessorState.Finished.FINISHED;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+public class LeafTableFunctionOperator
+ implements SourceOperator
+{
+ public static class LeafTableFunctionOperatorFactory
+ implements SourceOperatorFactory
+ {
+ private final int operatorId;
+ private final PlanNodeId sourceId;
+ private final TableFunctionProcessorProvider tableFunctionProvider;
+ private final ConnectorTableFunctionHandle functionHandle;
+ private boolean closed;
+
+ public LeafTableFunctionOperatorFactory(int operatorId, PlanNodeId sourceId, TableFunctionProcessorProvider tableFunctionProvider, ConnectorTableFunctionHandle functionHandle)
+ {
+ this.operatorId = operatorId;
+ this.sourceId = requireNonNull(sourceId, "sourceId is null");
+ this.tableFunctionProvider = requireNonNull(tableFunctionProvider, "tableFunctionProvider is null");
+ this.functionHandle = requireNonNull(functionHandle, "functionHandle is null");
+ }
+
+ @Override
+ public PlanNodeId getSourceId()
+ {
+ return sourceId;
+ }
+
+ @Override
+ public SourceOperator createOperator(DriverContext driverContext)
+ {
+ checkState(!closed, "Factory is already closed");
+ OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, sourceId, LeafTableFunctionOperator.class.getSimpleName());
+ return new LeafTableFunctionOperator(operatorContext, sourceId, tableFunctionProvider, functionHandle);
+ }
+
+ @Override
+ public void noMoreOperators()
+ {
+ closed = true;
+ }
+ }
+
+ private final OperatorContext operatorContext;
+ private final PlanNodeId sourceId;
+ private final TableFunctionProcessorProvider tableFunctionProvider;
+ private final ConnectorTableFunctionHandle functionHandle;
+
+ private ConnectorSplit currentSplit;
+ private final List pendingSplits = new ArrayList<>();
+ private boolean noMoreSplits;
+
+ private TableFunctionSplitProcessor processor;
+ private boolean processorUsedData;
+ private boolean processorFinishedSplit = true;
+ private ListenableFuture> processorBlocked = NOT_BLOCKED;
+
+ public LeafTableFunctionOperator(OperatorContext operatorContext, PlanNodeId sourceId, TableFunctionProcessorProvider tableFunctionProvider, ConnectorTableFunctionHandle functionHandle)
+ {
+ this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
+ this.sourceId = requireNonNull(sourceId, "sourceId is null");
+ this.tableFunctionProvider = requireNonNull(tableFunctionProvider, "tableFunctionProvider is null");
+ this.functionHandle = requireNonNull(functionHandle, "functionHandle is null");
+ }
+
+ private void resetProcessor()
+ {
+ this.processor = tableFunctionProvider.getSplitProcessor(functionHandle);
+ this.processorUsedData = false;
+ this.processorFinishedSplit = false;
+ this.processorBlocked = NOT_BLOCKED;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext()
+ {
+ return operatorContext;
+ }
+
+ @Override
+ public PlanNodeId getSourceId()
+ {
+ return sourceId;
+ }
+
+ @Override
+ public boolean needsInput()
+ {
+ return false;
+ }
+
+ @Override
+ public void addInput(Page page)
+ {
+ throw new UnsupportedOperationException(getClass().getName() + " does not take input");
+ }
+
+ @Override
+ public Supplier> addSplit(ScheduledSplit split)
+ {
+ Split curSplit = requireNonNull(split, "split is null").getSplit();
+ checkState(!noMoreSplits, "no more splits expected");
+ ConnectorSplit curConnectorSplit = curSplit.getConnectorSplit();
+ pendingSplits.add(curConnectorSplit);
+ return Optional::empty;
+ }
+
+ @Override
+ public void noMoreSplits()
+ {
+ noMoreSplits = true;
+ }
+
+ @Override
+ public Page getOutput()
+ {
+ if (processorFinishedSplit) {
+ // start processing a new split
+ if (pendingSplits.isEmpty()) {
+ // no more splits to process at the moment
+ return null;
+ }
+ currentSplit = pendingSplits.remove(0);
+ resetProcessor();
+ }
+ else {
+ // a split is being processed
+ requireNonNull(currentSplit, "currentSplit is null");
+ }
+
+ TableFunctionProcessorState state = processor.process(processorUsedData ? null : currentSplit);
+ if (state == FINISHED) {
+ processorFinishedSplit = true;
+ }
+ if (state instanceof Blocked) {
+ Blocked blocked = (Blocked) state;
+ processorBlocked = toListenableFuture(blocked.getFuture());
+ }
+ if (state instanceof Processed) {
+ Processed processed = (Processed) state;
+ if (processed.isUsedInput()) {
+ processorUsedData = true;
+ }
+ if (processed.getResult() != null) {
+ return processed.getResult();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public ListenableFuture> isBlocked()
+ {
+ return processorBlocked;
+ }
+
+ @Override
+ public void finish()
+ {
+ // this method is redundant. the operator takes no input at all. noMoreSplits() should be called instead.
+ }
+
+ @Override
+ public boolean isFinished()
+ {
+ return processorFinishedSplit && pendingSplits.isEmpty() && noMoreSplits;
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/operator/PageBuffer.java b/presto-main-base/src/main/java/com/facebook/presto/operator/PageBuffer.java
new file mode 100644
index 0000000000000..cb14500597a1b
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/operator/PageBuffer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.operator;
+
+import com.facebook.presto.common.Page;
+import com.facebook.presto.operator.WorkProcessor.ProcessState;
+import jakarta.annotation.Nullable;
+
+import static com.facebook.presto.operator.WorkProcessor.ProcessState.finished;
+import static com.facebook.presto.operator.WorkProcessor.ProcessState.ofResult;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+public class PageBuffer
+{
+ @Nullable
+ private Page page;
+ private boolean finished;
+
+ public WorkProcessor pages()
+ {
+ return WorkProcessor.create(() -> {
+ if (isFinished() && isEmpty()) {
+ return finished();
+ }
+
+ if (!isEmpty()) {
+ Page result = page;
+ page = null;
+ return ofResult(result);
+ }
+
+ return ProcessState.yield();
+ });
+ }
+
+ public boolean isEmpty()
+ {
+ return page == null;
+ }
+
+ public boolean isFinished()
+ {
+ return finished;
+ }
+
+ public void add(Page page)
+ {
+ checkState(isEmpty(), "page buffer is not empty");
+ checkState(!isFinished(), "page buffer is finished");
+ requireNonNull(page, "page is null");
+
+ if (page.getPositionCount() == 0) {
+ return;
+ }
+
+ this.page = page;
+ }
+
+ public void finish()
+ {
+ finished = true;
+ }
+}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/operator/PagesIndex.java b/presto-main-base/src/main/java/com/facebook/presto/operator/PagesIndex.java
index 640ae9919ca90..f4da55ac314e0 100644
--- a/presto-main-base/src/main/java/com/facebook/presto/operator/PagesIndex.java
+++ b/presto-main-base/src/main/java/com/facebook/presto/operator/PagesIndex.java
@@ -270,9 +270,9 @@ public void swap(int a, int b)
valueAddresses.swap(a, b);
}
- public int buildPage(int position, int[] outputChannels, PageBuilder pageBuilder)
+ public int buildPage(int position, int endPosition, int[] outputChannels, PageBuilder pageBuilder)
{
- while (!pageBuilder.isFull() && position < positionCount) {
+ while (!pageBuilder.isFull() && position < endPosition) {
long pageAddress = valueAddresses.get(position);
int blockIndex = decodeSliceIndex(pageAddress);
int blockPosition = decodePosition(pageAddress);
@@ -562,10 +562,29 @@ protected Page computeNext()
}
public Iterator getSortedPages()
+ {
+ return getSortedPagesFromRange(0, positionCount);
+ }
+
+ /**
+ * Get sorted pages from the specified section of the PagesIndex.
+ *
+ * @param start start position of the section, inclusive
+ * @param end end position of the section, exclusive
+ * @return iterator of pages
+ */
+ public Iterator getSortedPages(int start, int end)
+ {
+ checkArgument(start >= 0 && end <= positionCount, "position range out of bounds");
+ checkArgument(start <= end, "invalid position range");
+ return getSortedPagesFromRange(start, end);
+ }
+
+ private Iterator getSortedPagesFromRange(int start, int end)
{
return new AbstractIterator()
{
- private int currentPosition;
+ private int currentPosition = start;
private final PageBuilder pageBuilder = new PageBuilder(types);
private final int[] outputChannels = new int[types.size()];
@@ -576,7 +595,7 @@ public Iterator getSortedPages()
@Override
public Page computeNext()
{
- currentPosition = buildPage(currentPosition, outputChannels, pageBuilder);
+ currentPosition = buildPage(currentPosition, end, outputChannels, pageBuilder);
if (pageBuilder.isEmpty()) {
return endOfData();
}
diff --git a/presto-main-base/src/main/java/com/facebook/presto/operator/RegularTableFunctionPartition.java b/presto-main-base/src/main/java/com/facebook/presto/operator/RegularTableFunctionPartition.java
new file mode 100644
index 0000000000000..5d0376f3be7ee
--- /dev/null
+++ b/presto-main-base/src/main/java/com/facebook/presto/operator/RegularTableFunctionPartition.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.operator;
+
+import com.facebook.presto.common.Page;
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.BlockBuilder;
+import com.facebook.presto.common.block.RunLengthEncodedBlock;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.spi.function.table.TableFunctionDataProcessor;
+import com.facebook.presto.spi.function.table.TableFunctionProcessorState;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.primitives.Ints;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.facebook.airlift.concurrent.MoreFutures.toListenableFuture;
+import static com.facebook.presto.common.Utils.checkState;
+import static com.facebook.presto.common.type.BigintType.BIGINT;
+import static com.facebook.presto.spi.StandardErrorCode.FUNCTION_IMPLEMENTATION_ERROR;
+import static com.facebook.presto.spi.function.table.TableFunctionProcessorState.Finished.FINISHED;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static java.lang.Math.min;
+import static java.lang.Math.toIntExact;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class RegularTableFunctionPartition
+ implements TableFunctionPartition
+{
+ private final PagesIndex pagesIndex;
+ private final int partitionStart;
+ private final int partitionEnd;
+ private final Iterator sortedPages;
+
+ private final TableFunctionDataProcessor tableFunction;
+ private final int properChannelsCount;
+ private final int passThroughSourcesCount;
+
+ // channels required by the table function, listed by source in order of argument declarations
+ private final int[][] requiredChannels;
+
+ // for each input channel, the end position of actual data in that channel (exclusive) relative to partition. The remaining rows are "filler" rows, and should not be passed to table function or passed-through
+ private final int[] endOfData;
+
+ // a builder for each pass-through column, in order of argument declarations
+ private final PassThroughColumnProvider[] passThroughProviders;
+
+ // number of processed input positions from partition start. all sources have been processed up to this position, except the sources whose partitions ended earlier.
+ private int processedPositions;
+
+ public RegularTableFunctionPartition(
+ PagesIndex pagesIndex,
+ int partitionStart,
+ int partitionEnd,
+ TableFunctionDataProcessor tableFunction,
+ int properChannelsCount,
+ int passThroughSourcesCount,
+ List> requiredChannels,
+ Optional