diff --git a/phoenix-core-client/src/main/antlr3/PhoenixSQL.g b/phoenix-core-client/src/main/antlr3/PhoenixSQL.g index a2e913093e9..2324c78d673 100644 --- a/phoenix-core-client/src/main/antlr3/PhoenixSQL.g +++ b/phoenix-core-client/src/main/antlr3/PhoenixSQL.g @@ -865,9 +865,10 @@ finally{ contextStack.pop(); } // Parse a full upsert expression structure. upsert_node returns [UpsertStatement ret] +@init{List> v = new ArrayList>(); } : UPSERT (hint=hintClause)? INTO t=from_table_name (LPAREN p=upsert_column_refs RPAREN)? - ((VALUES LPAREN v=one_or_more_expressions RPAREN ( + ((VALUES LPAREN e = one_or_more_expressions {v.add(e);} RPAREN (COMMA LPAREN e = one_or_more_expressions {v.add(e);} RPAREN )* ( ON DUPLICATE KEY ( ig=IGNORE | ( upd=UPDATE pairs=update_column_pairs ) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 7d426904201..b9dfbc33e57 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -26,6 +26,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; import java.util.Collections; @@ -383,7 +384,7 @@ public MutationPlan compile(UpsertStatement upsert) throws SQLException { int[] columnIndexesToBe; int nColumnsToSet = 0; int[] pkSlotIndexesToBe; - List valueNodes = upsert.getValues(); + List> valueNodesList = upsert.getValues(); List targetColumns; NamedTableNode tableNode = upsert.getTable(); String tableName = tableNode.getName().getTableName(); @@ -557,8 +558,12 @@ public MutationPlan compile(UpsertStatement upsert) throws SQLException { columnsBeingSet.set(columnIndexesToBe[i] = rowTimestampCol.getPosition()); pkSlotIndexesToBe[i] = table.getRowTimestampColPos(); targetColumns.add(rowTimestampCol); - if (valueNodes != null && !valueNodes.isEmpty()) { - valueNodes.add(getNodeForRowTimestampColumn(rowTimestampCol)); + if (valueNodesList != null) { + for (List parseNode : valueNodesList) { + if (!parseNode.isEmpty()) { + parseNode.add(getNodeForRowTimestampColumn(rowTimestampCol)); + } + } } nColumnsToSet++; } @@ -571,7 +576,7 @@ public MutationPlan compile(UpsertStatement upsert) throws SQLException { } } boolean isAutoCommit = connection.getAutoCommit(); - if (valueNodes == null) { + if (valueNodesList.isEmpty()) { SelectStatement select = upsert.getSelect(); assert (select != null); select = SubselectRewriter.flatten(select, connection); @@ -675,8 +680,8 @@ public MutationPlan compile(UpsertStatement upsert) throws SQLException { // Cannot auto commit if doing aggregation or topN or salted // Salted causes problems because the row may end up living on a different region } else { - nValuesToSet = valueNodes.size() + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) - + (isSharedViewIndex ? 1 : 0); + nValuesToSet = valueNodesList.get(0).size() + addViewColumnsToBe.size() + + (isTenantSpecific ? 1 : 0) + (isSharedViewIndex ? 1 : 0); } // Resize down to allow a subset of columns to be specifiable if (columnNodes.isEmpty() && columnIndexesToBe.length >= nValuesToSet) { @@ -703,7 +708,7 @@ public MutationPlan compile(UpsertStatement upsert) throws SQLException { final QueryPlan originalQueryPlan = queryPlanToBe; RowProjector projectorToBe = null; // Optimize only after all checks have been performed - if (valueNodes == null) { + if (valueNodesList.isEmpty()) { queryPlanToBe = new QueryOptimizer(services).optimize(queryPlanToBe, statement, targetColumns, parallelIteratorFactoryToBe); projectorToBe = queryPlanToBe.getProjector(); @@ -726,7 +731,7 @@ public MutationPlan compile(UpsertStatement upsert) throws SQLException { //////////////////////////////////////////////////////////////////// // UPSERT SELECT ///////////////////////////////////////////////////////////////////// - if (valueNodes == null) { + if (valueNodesList.isEmpty()) { // Before we re-order, check that for updatable view columns // the projected expression either matches the column name or // is a constant with the same required value. @@ -851,101 +856,113 @@ public MutationPlan compile(UpsertStatement upsert) throws SQLException { //////////////////////////////////////////////////////////////////// // UPSERT VALUES ///////////////////////////////////////////////////////////////////// - final byte[][] values = new byte[nValuesToSet][]; - int nodeIndex = 0; - if (isSharedViewIndex) { - values[nodeIndex++] = table.getviewIndexIdType().toBytes(table.getViewIndexId()); - } - if (isTenantSpecific) { - PName tenantId = connection.getTenantId(); - values[nodeIndex++] = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), - table.getBucketNum() != null, tenantId, isSharedViewIndex); - } - - final int nodeIndexOffset = nodeIndex; - // Allocate array based on size of all columns in table, - // since some values may not be set (if they're nullable). - final StatementContext context = + StatementContext context = new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement)); - UpsertValuesCompiler expressionBuilder = new UpsertValuesCompiler(context); - final List constantExpressions = - Lists.newArrayListWithExpectedSize(valueNodes.size()); - // First build all the expressions, as with sequences we want to collect them all first - // and initialize them in one batch - List> jsonExpressions = Lists.newArrayList(); - List> nonPKColumns = Lists.newArrayList(); - for (ParseNode valueNode : valueNodes) { - if (!valueNode.hasJsonExpression() && !valueNode.isStateless()) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.VALUE_IN_UPSERT_NOT_CONSTANT).build() - .buildException(); - } - PColumn column = allColumns.get(columnIndexes[nodeIndex]); - expressionBuilder.setColumn(column); - Expression expression = valueNode.accept(expressionBuilder); - if ( - expression.getDataType() != null - && !expression.getDataType().isCastableTo(column.getDataType()) - ) { - throw TypeMismatchException.newException(expression.getDataType(), column.getDataType(), - "expression: " + expression.toString() + " in column " + column); + List valuesList = new ArrayList<>(); + int nodeIndexOffset = 0; + List> constantExpressionsList = new ArrayList<>(); + + byte[] onDupKeyBytes = null; + OnDuplicateKeyType onDupKeyType = null; + + for (List valueNodesItem : valueNodesList) { + byte[][] values = new byte[nValuesToSet][]; + int nodeIndex = 0; + if (isSharedViewIndex) { + values[nodeIndex++] = table.getviewIndexIdType().toBytes(table.getViewIndexId()); } - if (!SchemaUtil.isPKColumn(column) && !valueNode.hasJsonExpression()) { - nonPKColumns - .add(new Pair<>(ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(), - column.getName().getString()), valueNode)); - } else if (valueNode.hasJsonExpression()) { - jsonExpressions - .add(new Pair<>(ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(), - column.getName().getString()), valueNode)); + if (isTenantSpecific) { + PName tenantId = connection.getTenantId(); + values[nodeIndex++] = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), + table.getBucketNum() != null, tenantId, isSharedViewIndex); } - constantExpressions.add(expression); - nodeIndex++; - } - if (nonPKColumns.size() > 0 && jsonExpressions.size() > 0) { - jsonExpressions.addAll(nonPKColumns); - nonPKColumns.clear(); - } - byte[] onDupKeyBytesToBe = null; - List> onDupKeyPairs = upsert.getOnDupKeyPairs(); - OnDuplicateKeyType onDupKeyType = upsert.getOnDupKeyType(); - if (onDupKeyPairs != null) { - if (table.isImmutableRows()) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE) - .setSchemaName(table.getSchemaName().getString()) - .setTableName(table.getTableName().getString()).build().buildException(); - } - if (table.isTransactional()) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL) - .setSchemaName(table.getSchemaName().getString()) - .setTableName(table.getTableName().getString()).build().buildException(); + nodeIndexOffset = nodeIndex; + // Allocate array based on size of all columns in table, + // since some values may not be set (if they're nullable). + UpsertValuesCompiler expressionBuilder = new UpsertValuesCompiler(context); + List constantExpressions = + Lists.newArrayListWithExpectedSize(valueNodesItem.size()); + // First build all the expressions, as with sequences we want to collect them all first + // and initialize them in one batch + List> jsonExpressions = Lists.newArrayList(); + List> nonPKColumns = Lists.newArrayList(); + for (ParseNode valueNode : valueNodesItem) { + if (!valueNode.hasJsonExpression() && !valueNode.isStateless()) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.VALUE_IN_UPSERT_NOT_CONSTANT).build() + .buildException(); + } + PColumn column = allColumns.get(columnIndexes[nodeIndex]); + expressionBuilder.setColumn(column); + Expression expression = valueNode.accept(expressionBuilder); + if ( + expression.getDataType() != null + && !expression.getDataType().isCastableTo(column.getDataType()) + ) { + throw TypeMismatchException.newException(expression.getDataType(), column.getDataType(), + "expression: " + expression.toString() + " in column " + column); + } + if (!SchemaUtil.isPKColumn(column) && !valueNode.hasJsonExpression()) { + nonPKColumns + .add(new Pair<>(ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(), + column.getName().getString()), valueNode)); + } else if (valueNode.hasJsonExpression()) { + jsonExpressions + .add(new Pair<>(ColumnName.caseSensitiveColumnName(column.getFamilyName().getString(), + column.getName().getString()), valueNode)); + } + constantExpressions.add(expression); + nodeIndex++; } - if (connection.getSCN() != null) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_SCN_IN_ON_DUP_KEY) - .setSchemaName(table.getSchemaName().getString()) - .setTableName(table.getTableName().getString()).build().buildException(); + if (nonPKColumns.size() > 0 && jsonExpressions.size() > 0) { + jsonExpressions.addAll(nonPKColumns); + nonPKColumns.clear(); } - - switch (onDupKeyType) { - case IGNORE: { - onDupKeyBytesToBe = PhoenixIndexBuilderHelper.serializeOnDupKeyIgnore(); - break; + byte[] onDupKeyBytesToBe = null; + List> onDupKeyPairs = upsert.getOnDupKeyPairs(); + onDupKeyType = upsert.getOnDupKeyType(); + + if (onDupKeyPairs != null) { + if (table.isImmutableRows()) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_IMMUTABLE) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()).build().buildException(); } - case UPDATE: - case UPDATE_ONLY: { - onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context, onDupKeyPairs, resolver); - break; + if (table.isTransactional()) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()).build().buildException(); } - default: - break; + if (connection.getSCN() != null) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_SCN_IN_ON_DUP_KEY) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()).build().buildException(); + } + + switch (onDupKeyType) { + case IGNORE: { + onDupKeyBytesToBe = PhoenixIndexBuilderHelper.serializeOnDupKeyIgnore(); + break; + } + case UPDATE: + case UPDATE_ONLY: { + onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context, onDupKeyPairs, resolver); + break; + } + default: + break; + } + } else if (!jsonExpressions.isEmpty()) { + onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context, jsonExpressions, resolver); } - } else if (!jsonExpressions.isEmpty()) { - onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context, jsonExpressions, resolver); + onDupKeyBytes = onDupKeyBytesToBe; + valuesList.add(values); + constantExpressionsList.add(constantExpressions); } - final byte[] onDupKeyBytes = onDupKeyBytesToBe; - return new UpsertValuesMutationPlan(context, tableRef, nodeIndexOffset, constantExpressions, - allColumns, columnIndexes, overlapViewColumns, values, addViewColumns, connection, + return new UpsertValuesMutationPlan(context, tableRef, nodeIndexOffset, constantExpressionsList, + allColumns, columnIndexes, overlapViewColumns, valuesList, addViewColumns, connection, pkSlotIndexes, useServerTimestamp, onDupKeyBytes, onDupKeyType, maxSize, maxSizeBytes); } @@ -1252,11 +1269,11 @@ private class UpsertValuesMutationPlan implements MutationPlan { private final StatementContext context; private final TableRef tableRef; private final int nodeIndexOffset; - private final List constantExpressions; + private final List> constantExpressionsList; private final List allColumns; private final int[] columnIndexes; private final Set overlapViewColumns; - private final byte[][] values; + private final List valuesList; private final Set addViewColumns; private final PhoenixConnection connection; private final int[] pkSlotIndexes; @@ -1267,19 +1284,19 @@ private class UpsertValuesMutationPlan implements MutationPlan { private final long maxSizeBytes; public UpsertValuesMutationPlan(StatementContext context, TableRef tableRef, - int nodeIndexOffset, List constantExpressions, List allColumns, - int[] columnIndexes, Set overlapViewColumns, byte[][] values, + int nodeIndexOffset, List> constantExpressionsList, List allColumns, + int[] columnIndexes, Set overlapViewColumns, List valuesList, Set addViewColumns, PhoenixConnection connection, int[] pkSlotIndexes, boolean useServerTimestamp, byte[] onDupKeyBytes, OnDuplicateKeyType onDupKeyType, int maxSize, long maxSizeBytes) { this.context = context; this.tableRef = tableRef; this.nodeIndexOffset = nodeIndexOffset; - this.constantExpressions = constantExpressions; + this.constantExpressionsList = constantExpressionsList; this.allColumns = allColumns; this.columnIndexes = columnIndexes; this.overlapViewColumns = overlapViewColumns; - this.values = values; + this.valuesList = valuesList; this.addViewColumns = addViewColumns; this.connection = connection; this.pkSlotIndexes = pkSlotIndexes; @@ -1325,74 +1342,83 @@ public MutationState execute() throws SQLException { ImmutableBytesWritable ptr = context.getTempPtr(); final SequenceManager sequenceManager = context.getSequenceManager(); // Next evaluate all the expressions - int nodeIndex = nodeIndexOffset; PTable table = tableRef.getTable(); Tuple tuple = sequenceManager.getSequenceCount() == 0 ? null : sequenceManager.newSequenceTuple(null); - for (Expression constantExpression : constantExpressions) { - if (!constantExpression.isStateless()) { - nodeIndex++; - continue; - } - PColumn column = allColumns.get(columnIndexes[nodeIndex]); - constantExpression.evaluate(tuple, ptr); - Object value = null; - if (constantExpression.getDataType() != null) { - value = constantExpression.getDataType().toObject(ptr, constantExpression.getSortOrder(), - constantExpression.getMaxLength(), constantExpression.getScale()); - if (!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) { - throw TypeMismatchException.newException(constantExpression.getDataType(), - column.getDataType(), - "expression: " + constantExpression.toString() + " in column " + column); + for (int index = 0; index < valuesList.size(); index++) { + byte[][] valuesListItem = valuesList.get(index); + int nodeIndex = nodeIndexOffset; + for (Expression constantExpression : constantExpressionsList.get(index)) { + if (!constantExpression.isStateless()) { + nodeIndex++; + continue; + } + PColumn column = allColumns.get(columnIndexes[nodeIndex]); + constantExpression.evaluate(tuple, ptr); + Object value = null; + if (constantExpression.getDataType() != null) { + value = + constantExpression.getDataType().toObject(ptr, constantExpression.getSortOrder(), + constantExpression.getMaxLength(), constantExpression.getScale()); + if (!constantExpression.getDataType().isCoercibleTo(column.getDataType(), value)) { + throw TypeMismatchException.newException(constantExpression.getDataType(), + column.getDataType(), + "expression: " + constantExpression.toString() + " in column " + column); + } + if ( + !column.getDataType().isSizeCompatible(ptr, value, constantExpression.getDataType(), + constantExpression.getSortOrder(), constantExpression.getMaxLength(), + constantExpression.getScale(), column.getMaxLength(), column.getScale()) + ) { + throw new DataExceedsCapacityException(column.getDataType(), column.getMaxLength(), + column.getScale(), column.getName().getString()); + } } + column.getDataType().coerceBytes(ptr, value, constantExpression.getDataType(), + constantExpression.getMaxLength(), constantExpression.getScale(), + constantExpression.getSortOrder(), column.getMaxLength(), column.getScale(), + column.getSortOrder(), table.rowKeyOrderOptimizable()); if ( - !column.getDataType().isSizeCompatible(ptr, value, constantExpression.getDataType(), - constantExpression.getSortOrder(), constantExpression.getMaxLength(), - constantExpression.getScale(), column.getMaxLength(), column.getScale()) + overlapViewColumns.contains(column) + && Bytes.compareTo(ptr.get(), ptr.getOffset(), ptr.getLength(), + column.getViewConstant(), 0, column.getViewConstant().length - 1) != 0 ) { - throw new DataExceedsCapacityException(column.getDataType(), column.getMaxLength(), - column.getScale(), column.getName().getString()); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN) + .setColumnName(column.getName().getString()) + .setMessage("value=" + constantExpression.toString()).build().buildException(); } + valuesListItem[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr); + nodeIndex++; } - column.getDataType().coerceBytes(ptr, value, constantExpression.getDataType(), - constantExpression.getMaxLength(), constantExpression.getScale(), - constantExpression.getSortOrder(), column.getMaxLength(), column.getScale(), - column.getSortOrder(), table.rowKeyOrderOptimizable()); - if ( - overlapViewColumns.contains(column) && Bytes.compareTo(ptr.get(), ptr.getOffset(), - ptr.getLength(), column.getViewConstant(), 0, column.getViewConstant().length - 1) != 0 - ) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_UPDATE_VIEW_COLUMN) - .setColumnName(column.getName().getString()) - .setMessage("value=" + constantExpression.toString()).build().buildException(); + // Add columns based on view + for (PColumn column : addViewColumns) { + if (IndexUtil.getViewConstantValue(column, ptr)) { + valuesListItem[nodeIndex++] = ByteUtil.copyKeyBytesIfNecessary(ptr); + } else { + throw new IllegalStateException(); + } } - values[nodeIndex] = ByteUtil.copyKeyBytesIfNecessary(ptr); - nodeIndex++; } - // Add columns based on view - for (PColumn column : addViewColumns) { - if (IndexUtil.getViewConstantValue(column, ptr)) { - values[nodeIndex++] = ByteUtil.copyKeyBytesIfNecessary(ptr); - } else { - throw new IllegalStateException(); + + MultiRowMutationState mutation = new MultiRowMutationState(valuesList.size()); + for (byte[][] valuesListItems : valuesList) { + IndexMaintainer indexMaintainer = null; + byte[][] viewConstants = null; + if (table.getIndexType() == IndexType.LOCAL) { + PTable parentTable = statement.getConnection().getMetaDataCache() + .getTableRef(new PTableKey(statement.getConnection().getTenantId(), + table.getParentName().getString())) + .getTable(); + indexMaintainer = table.getIndexMaintainer(parentTable, connection); + viewConstants = IndexUtil.getViewConstants(parentTable); } + int maxHBaseClientKeyValueSize = statement.getConnection().getQueryServices().getProps() + .getInt(QueryServices.HBASE_CLIENT_KEYVALUE_MAXSIZE, + QueryServicesOptions.DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE); + setValues(valuesListItems, pkSlotIndexes, columnIndexes, table, mutation, statement, + useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, onDupKeyType, 0, + maxHBaseClientKeyValueSize); } - MultiRowMutationState mutation = new MultiRowMutationState(1); - IndexMaintainer indexMaintainer = null; - byte[][] viewConstants = null; - if (table.getIndexType() == IndexType.LOCAL) { - PTable parentTable = statement.getConnection().getMetaDataCache().getTableRef( - new PTableKey(statement.getConnection().getTenantId(), table.getParentName().getString())) - .getTable(); - indexMaintainer = table.getIndexMaintainer(parentTable, connection); - viewConstants = IndexUtil.getViewConstants(parentTable); - } - int maxHBaseClientKeyValueSize = statement.getConnection().getQueryServices().getProps() - .getInt(QueryServices.HBASE_CLIENT_KEYVALUE_MAXSIZE, - QueryServicesOptions.DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE); - setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, - useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, onDupKeyType, 0, - maxHBaseClientKeyValueSize); return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection); } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 8ef621d5351..bfa148d32c6 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -1181,7 +1181,7 @@ private static class ExecutableUpsertStatement extends UpsertStatement implements CompilableStatement { private ExecutableUpsertStatement(NamedTableNode table, HintNode hintNode, - List columns, List values, SelectStatement select, int bindCount, + List columns, List> values, SelectStatement select, int bindCount, Map udfParseNodes, List> onDupKeyPairs, OnDuplicateKeyType onDupKeyType, boolean returningRow) { super(table, hintNode, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs, @@ -2126,7 +2126,7 @@ public ExecutableSelectStatement select(TableNode from, HintNode hint, boolean i @Override public ExecutableUpsertStatement upsert(NamedTableNode table, HintNode hintNode, - List columns, List values, SelectStatement select, int bindCount, + List columns, List> values, SelectStatement select, int bindCount, Map udfParseNodes, List> onDupKeyPairs, UpsertStatement.OnDuplicateKeyType onDupKeyType, boolean returningRow) { return new ExecutableUpsertStatement(table, hintNode, columns, values, select, bindCount, diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java index 6a226646950..f3c373601c9 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java @@ -898,7 +898,7 @@ public SelectStatement select(TableNode from, HintNode hint, boolean isDistinct, } public UpsertStatement upsert(NamedTableNode table, HintNode hint, List columns, - List values, SelectStatement select, int bindCount, + List> values, SelectStatement select, int bindCount, Map udfParseNodes, List> onDupKeyPairs, UpsertStatement.OnDuplicateKeyType onDupKeyType, boolean returningRow) { return new UpsertStatement(table, hint, columns, values, select, bindCount, udfParseNodes, diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/UpsertStatement.java b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/UpsertStatement.java index d89b48bdb51..ade24f2aef0 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/UpsertStatement.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/UpsertStatement.java @@ -32,7 +32,7 @@ public enum OnDuplicateKeyType { } private final List columns; - private final List values; + private final List> values; private final SelectStatement select; private final HintNode hint; private final List> onDupKeyPairs; @@ -40,7 +40,7 @@ public enum OnDuplicateKeyType { private final boolean returningRow; public UpsertStatement(NamedTableNode table, HintNode hint, List columns, - List values, SelectStatement select, int bindCount, + List> values, SelectStatement select, int bindCount, Map udfParseNodes, List> onDupKeyPairs, OnDuplicateKeyType onDupKeyType, boolean returningRow) { super(table, bindCount, udfParseNodes); @@ -57,7 +57,7 @@ public List getColumns() { return columns; } - public List getValues() { + public List> getValues() { return values; } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultipleUpsertIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultipleUpsertIT.java new file mode 100644 index 00000000000..e12cc0e9227 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultipleUpsertIT.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.util.Properties; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(ParallelStatsDisabledTest.class) +public class MultipleUpsertIT extends ParallelStatsDisabledIT { + @Test + public void testUpsertMultiple() throws Exception { + Properties props = new Properties(); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + String ddl = + "CREATE TABLE " + tableName + "(K VARCHAR NOT NULL PRIMARY KEY, INT INTEGER, INT2 INTEGER)"; + conn.createStatement().execute(ddl); + + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('A', 11, 12)"); + conn.createStatement().execute("UPSERT INTO " + tableName + "(K, INT) VALUES ('B', 2)"); + conn.createStatement() + .execute("UPSERT INTO " + tableName + "(K, INT, INT2) VALUES ('E', 5, 5),('F', 61, 6)"); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('C', 31, 32),('D', 41, 42)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('G', 7, 72),('H', 8)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('I', 9),('I', 10)"); + conn.commit(); + + ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName); + assertTrue(rs.next()); + assertEquals(9, rs.getInt(1)); + + rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName + " ORDER BY K"); + rs.next(); + assertEquals(rs.getString(1), "A"); + assertEquals(rs.getInt(2), 11); + assertEquals(rs.getInt(3), 12); + rs.next(); + assertEquals(rs.getString(1), "B"); + assertEquals(rs.getInt(2), 2); + rs.next(); + assertEquals(rs.getString(1), "C"); + assertEquals(rs.getInt(2), 31); + assertEquals(rs.getInt(3), 32); + rs.next(); + assertEquals(rs.getString(1), "D"); + assertEquals(rs.getInt(2), 41); + assertEquals(rs.getInt(3), 42); + rs.next(); + assertEquals(rs.getString(1), "E"); + assertEquals(rs.getInt(2), 5); + assertEquals(rs.getInt(3), 5); + rs.next(); + assertEquals(rs.getString(1), "F"); + assertEquals(rs.getInt(2), 61); + assertEquals(rs.getInt(3), 6); + rs.next(); + assertEquals(rs.getString(1), "G"); + assertEquals(rs.getInt(2), 7); + assertEquals(rs.getInt(3), 72); + rs.next(); + assertEquals(rs.getString(1), "H"); + assertEquals(rs.getInt(2), 8); + rs.next(); + assertEquals(rs.getString(1), "I"); + assertEquals(rs.getInt(2), 10); + } + + @Test + public void testUpsertMultiple2() throws Exception { + Properties props = new Properties(); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + String ddl = "CREATE TABLE " + tableName + "(K VARCHAR NOT NULL PRIMARY KEY, INT INTEGER)"; + conn.createStatement().execute(ddl); + + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('A', 1),(SUBSTR('APPLE',0,2), 2*2)"); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES (SUBSTR('DELTA',0,1), 5),('C', 2*3)"); + conn.commit(); + + ResultSet rs = + conn.createStatement().executeQuery("SELECT * FROM " + tableName + " ORDER BY K"); + rs.next(); + assertEquals(rs.getString(1), "A"); + assertEquals(rs.getInt(2), 1); + rs.next(); + assertEquals(rs.getString(1), "AP"); + assertEquals(rs.getInt(2), 4); + rs.next(); + assertEquals(rs.getString(1), "C"); + assertEquals(rs.getInt(2), 6); + rs.next(); + assertEquals(rs.getString(1), "D"); + assertEquals(rs.getInt(2), 5); + assertFalse(rs.next()); + + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java index 1b5c8008f75..94deefc7983 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java @@ -765,6 +765,42 @@ public void testDeleteWithOrderLimitWhereReturningRow() throws Exception { parseQuery(sql); } + @Test + public void testValidMultipleUpsert() throws Exception { + String sql = (("upsert into t VALUES(1,2),(3,4)")); + parseQuery(sql); + } + + @Test + public void testValidMultipleUpsert2() throws Exception { + String sql = "upsert into t(a,b) VALUES(1,2),(3,4)"; + parseQuery(sql); + } + + @Test + public void testValidMultipleUpsert3() throws Exception { + String sql = (("upsert into t(a,b) VALUES(1,2),(3,4),")); + parseQueryThatShouldFail(sql); + } + + @Test + public void testValidMultipleUpsert4() throws Exception { + String sql = (("upsert into t(a,b) VALUES()")); + parseQueryThatShouldFail(sql); + } + + @Test + public void testValidMultipleUpsert5() throws Exception { + String sql = (("upsert into t(a,b) VALUES(1,2)(3,4)")); + parseQueryThatShouldFail(sql); + } + + @Test + public void testValidMultipleUpsert6() throws Exception { + String sql = (("upsert into t(a,b) VALUES(1,2),(3,4")); + parseQueryThatShouldFail(sql); + } + @Test public void testDeleteInvalidReturningRow() throws Exception { String sql = "DELETE FROM T RETURNING PK1";