Skip to content

HIVE-28956 : Use msdb.alterPartitions() API and implement batching for alter table add column cascade command #5814

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -409,23 +409,41 @@ public List<Void> run(List<Partition> input) throws Exception {
if (runPartitionMetadataUpdate) {
if (cascade || retainOnColRemoval) {
parts = msdb.getPartitions(catName, dbname, name, -1);
for (Partition part : parts) {
Partition oldPart = new Partition(part);
List<FieldSchema> oldCols = part.getSd().getCols();
part.getSd().setCols(newt.getSd().getCols());
List<ColumnStatistics> colStats = updateOrGetPartitionColumnStats(msdb, catName, dbname, name,
part.getValues(), oldCols, oldt, part, null, null);
assert (colStats.isEmpty());
Deadline.checkTimeout();
if (cascade) {
msdb.alterPartition(
catName, dbname, name, part.getValues(), part, writeIdList);
} else {
String catalogName = catName;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vikramahuja1001, why do we need new local vars? otherwise looks good to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to access these variables within inner class. Cannot directly use catName, dbName. So copied them to effective final temp variables

Copy link
Member

@deniskuzZ deniskuzZ Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vikramahuja1001, would be cleaner if you do this at top level.
Try applying the below patch to your branch

Subject: [PATCH] refactor
---
Index: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java	(revision 2a5cc01ba722cd0026c774ab118f157b169ee9a4)
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java	(date 1749733306367)
@@ -103,9 +103,9 @@
       String name, Table newt, EnvironmentContext environmentContext,
       IHMSHandler handler, String writeIdList)
           throws InvalidOperationException, MetaException {
-    catName = normalizeIdentifier(catName);
-    name = normalizeIdentifier(name);
-    dbname = normalizeIdentifier(dbname);
+    String catalogName = normalizeIdentifier(catName);
+    String tableName = normalizeIdentifier(name);
+    String databaseName = normalizeIdentifier(dbname);
 
     final boolean cascade;
     final boolean replDataLocationChanged;
@@ -136,7 +136,7 @@
     // Validate bucketedColumns in new table
     List<String> bucketColumns = MetaStoreServerUtils.validateBucketColumns(newt.getSd());
     if (CollectionUtils.isNotEmpty(bucketColumns)) {
-      String errMsg = "Bucket columns - " + bucketColumns.toString() + " doesn't match with any table columns";
+      String errMsg = "Bucket columns - " + bucketColumns + " doesn't match with any table columns";
       LOG.error(errMsg);
       throw new InvalidOperationException(errMsg);
     }
@@ -162,14 +162,14 @@
       List<Partition> parts;
 
       // Switching tables between catalogs is not allowed.
-      if (!catName.equalsIgnoreCase(newt.getCatName())) {
+      if (!catalogName.equalsIgnoreCase(newt.getCatName())) {
         throw new InvalidOperationException("Tables cannot be moved between catalogs, old catalog" +
-            catName + ", new catalog " + newt.getCatName());
+            catalogName + ", new catalog " + newt.getCatName());
       }
 
       // check if table with the new name already exists
-      if (!newTblName.equals(name) || !newDbName.equals(dbname)) {
-        if (msdb.getTable(catName, newDbName, newTblName, null) != null) {
+      if (!newTblName.equals(tableName) || !newDbName.equals(databaseName)) {
+        if (msdb.getTable(catalogName, newDbName, newTblName, null) != null) {
           throw new InvalidOperationException("new table " + newDbName
               + "." + newTblName + " already exists");
         }
@@ -184,11 +184,11 @@
       msdb.openTransaction();
       // get old table
       // Note: we don't verify stats here; it's done below in alterTableUpdateTableColumnStats.
-      olddb = msdb.getDatabase(catName, dbname);
-      oldt = msdb.getTable(catName, dbname, name, null);
+      olddb = msdb.getDatabase(catalogName, databaseName);
+      oldt = msdb.getTable(catalogName, databaseName, tableName, null);
       if (oldt == null) {
         throw new InvalidOperationException("table " +
-            TableName.getQualified(catName, dbname, name) + " doesn't exist");
+            TableName.getQualified(catalogName, databaseName, tableName) + " doesn't exist");
       }
 
       if (expectedKey != null && expectedValue != null) {
@@ -226,11 +226,10 @@
       boolean partKeysPartiallyEqual = checkPartialPartKeysEqual(oldt.getPartitionKeys(),
           newt.getPartitionKeys());
 
-      if(!oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())){
+      if (!oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())){
         Map<String, String> properties = environmentContext.getProperties();
-        if (properties == null || (properties != null &&
-            !Boolean.parseBoolean(properties.getOrDefault(HiveMetaHook.ALLOW_PARTITION_KEY_CHANGE,
-                "false")))) {
+        if (properties == null || !Boolean.parseBoolean(properties.getOrDefault(HiveMetaHook.ALLOW_PARTITION_KEY_CHANGE,
+                "false"))) {
           if (!partKeysPartiallyEqual) {
             throw new InvalidOperationException("partition keys can not be changed.");
           }
@@ -251,7 +250,7 @@
               || StringUtils.isEmpty(newt.getSd().getLocation()))
           && (!MetaStoreUtils.isExternalTable(oldt));
 
-      Database db = msdb.getDatabase(catName, newDbName);
+      Database db = msdb.getDatabase(catalogName, newDbName);
 
       boolean renamedTranslatedToExternalTable = rename && MetaStoreUtils.isTranslatedToExternalTable(oldt)
           && MetaStoreUtils.isTranslatedToExternalTable(newt);
@@ -286,8 +285,8 @@
           // Same applies to the ACID tables suffixed with the `txnId`, case with `lockless reads`.
           String oldtRelativePath = wh.getDatabaseManagedPath(olddb).toUri()
               .relativize(srcPath.toUri()).toString();
-          boolean tableInSpecifiedLoc = !oldtRelativePath.equalsIgnoreCase(name)
-                  && !oldtRelativePath.equalsIgnoreCase(name + Path.SEPARATOR);
+          boolean tableInSpecifiedLoc = !oldtRelativePath.equalsIgnoreCase(tableName)
+                  && !oldtRelativePath.equalsIgnoreCase(tableName + Path.SEPARATOR);
 
 
           if (renamedTranslatedToExternalTable || !tableInSpecifiedLoc) {
@@ -323,7 +322,7 @@
             try {
               if (destFs.exists(destPath)) {
                 throw new InvalidOperationException("New location for this table " +
-                        TableName.getQualified(catName, newDbName, newTblName) +
+                        TableName.getQualified(catalogName, newDbName, newTblName) +
                         " already exists : " + destPath);
               }
               // check that src exists and also checks permissions necessary, rename src to dest
@@ -332,18 +331,18 @@
                 dataWasMoved = true;
               }
             } catch (IOException | MetaException e) {
-              LOG.error("Alter Table operation for " + dbname + "." + name + " failed.", e);
-              throw new InvalidOperationException("Alter Table operation for " + dbname + "." + name +
+              LOG.error("Alter Table operation for " + databaseName + "." + tableName + " failed.", e);
+              throw new InvalidOperationException("Alter Table operation for " + databaseName + "." + tableName +
                       " failed to move data due to: '" + getSimpleMessage(e)
                       + "' See hive log file for details.");
             }
 
             if (!HiveMetaStore.isRenameAllowed(olddb, db)) {
-              LOG.error("Alter Table operation for " + TableName.getQualified(catName, dbname, name) +
-                      "to new table = " + TableName.getQualified(catName, newDbName, newTblName) + " failed ");
+              LOG.error("Alter Table operation for " + TableName.getQualified(catalogName, databaseName, tableName) +
+                      "to new table = " + TableName.getQualified(catalogName, newDbName, newTblName) + " failed ");
               throw new MetaException("Alter table not allowed for table " +
-                      TableName.getQualified(catName, dbname, name) +
-                      "to new table = " + TableName.getQualified(catName, newDbName, newTblName));
+                      TableName.getQualified(catalogName, databaseName, tableName) +
+                      "to new table = " + TableName.getQualified(catalogName, newDbName, newTblName));
             }
           }
         }
@@ -353,7 +352,7 @@
           String newTblLocPath = dataWasMoved ? destPath.toUri().getPath() : null;
 
           // also the location field in partition
-          parts = msdb.getPartitions(catName, dbname, name, -1);
+          parts = msdb.getPartitions(catalogName, databaseName, tableName, -1);
           for (Partition part : parts) {
             String oldPartLoc = part.getSd().getLocation();
             if (dataWasMoved && oldPartLoc.contains(oldTblLocPath)) {
@@ -366,10 +365,10 @@
             part.setTableName(newTblName);
           }
           // Do not verify stats parameters on a partitioned table.
-          msdb.alterTable(catName, dbname, name, newt, null);
+          msdb.alterTable(catalogName, databaseName, tableName, newt, null);
           int partitionBatchSize = MetastoreConf.getIntVar(handler.getConf(),
               MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
-          String catalogName = catName;
+
           // alterPartition is only for changing the partition location in the table rename
           if (dataWasMoved) {
             Batchable.runBatched(partitionBatchSize, parts, new Batchable<Partition, Void>() {
@@ -384,7 +383,7 @@
           }
           Deadline.checkTimeout();
         } else {
-          msdb.alterTable(catName, dbname, name, newt, writeIdList);
+          msdb.alterTable(catalogName, databaseName, tableName, newt, writeIdList);
         }
       } else {
         // operations other than table rename
@@ -408,10 +407,7 @@
 
           if (runPartitionMetadataUpdate) {
             if (cascade || retainOnColRemoval) {
-              parts = msdb.getPartitions(catName, dbname, name, -1);
-              String catalogName = catName;
-              String databaseName = dbname;
-              String tableName = name;
+              parts = msdb.getPartitions(catalogName, databaseName, tableName, -1);
               Table finalOldt = oldt;
               int partitionBatchSize = MetastoreConf.getIntVar(handler.getConf(),
                       MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
@@ -446,17 +442,17 @@
               });
             } else {
               // clear all column stats to prevent incorract behaviour in case same column is reintroduced
-              TableName tableName = new TableName(catName, dbname, name);
-              msdb.deleteAllPartitionColumnStatistics(tableName, writeIdList);
+              msdb.deleteAllPartitionColumnStatistics(
+                  new TableName(catalogName, databaseName, tableName), writeIdList);
             }
             // Don't validate table-level stats for a partitoned table.
-            msdb.alterTable(catName, dbname, name, newt, null);
+            msdb.alterTable(catalogName, databaseName, tableName, newt, null);
           } else {
             LOG.warn("Alter table not cascaded to partitions.");
-            msdb.alterTable(catName, dbname, name, newt, writeIdList);
+            msdb.alterTable(catalogName, databaseName, tableName, newt, writeIdList);
           }
         } else {
-          msdb.alterTable(catName, dbname, name, newt, writeIdList);
+          msdb.alterTable(catalogName, databaseName, tableName, newt, writeIdList);
         }
       }
 
@@ -488,24 +484,22 @@
         // Txn was committed successfully.
         // If data location is changed in replication flow, then need to delete the old path.
         if (replDataLocationChanged) {
-          assert(olddb != null);
-          assert(oldt != null);
           Path deleteOldDataLoc = new Path(oldt.getSd().getLocation());
           boolean isSkipTrash = MetaStoreUtils.isSkipTrash(oldt.getParameters());
           try {
             wh.deleteDir(deleteOldDataLoc, true, isSkipTrash,
                     ReplChangeManager.shouldEnableCm(olddb, oldt));
             LOG.info("Deleted the old data location: {} for the table: {}",
-                    deleteOldDataLoc, dbname + "." + name);
+                    deleteOldDataLoc, databaseName + "." + tableName);
           } catch (MetaException ex) {
             // Eat the exception as it doesn't affect the state of existing tables.
             // Expect, user to manually drop this path when exception and so logging a warning.
             LOG.warn("Unable to delete the old data location: {} for the table: {}",
-                    deleteOldDataLoc, dbname + "." + name);
+                    deleteOldDataLoc, databaseName + "." + tableName);
           }
         }
       } else {
-        LOG.error("Failed to alter table " + TableName.getQualified(catName, dbname, name));
+        LOG.error("Failed to alter table " + TableName.getQualified(catalogName, databaseName, tableName));
         msdb.rollbackTransaction();
         if (!replDataLocationChanged && dataWasMoved) {
           try {
@@ -580,7 +574,7 @@
     }
 
     //alter partition
-    if (part_vals == null || part_vals.size() == 0) {
+    if (CollectionUtils.isEmpty(part_vals)) {
       try {
         msdb.openTransaction();
 
@@ -692,66 +686,64 @@
               + " Check metastore logs for detailed stack." + e.getMessage());
         }
 
-        if (destPath != null) {
-          newPartLoc = destPath.toString();
-          oldPartLoc = oldPart.getSd().getLocation();
-          LOG.info("srcPath:" + oldPartLoc);
-          LOG.info("descPath:" + newPartLoc);
-          srcPath = new Path(oldPartLoc);
-          srcFs = wh.getFs(srcPath);
-          destFs = wh.getFs(destPath);
-          // check that src and dest are on the same file system
-          if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
-            throw new InvalidOperationException("New table location " + destPath
-              + " is on a different file system than the old location "
-              + srcPath + ". This operation is not supported.");
-          }
+        newPartLoc = destPath.toString();
+        oldPartLoc = oldPart.getSd().getLocation();
+        LOG.info("srcPath:" + oldPartLoc);
+        LOG.info("descPath:" + newPartLoc);
+        srcPath = new Path(oldPartLoc);
+        srcFs = wh.getFs(srcPath);
+        destFs = wh.getFs(destPath);
+        // check that src and dest are on the same file system
+        if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
+          throw new InvalidOperationException("New table location " + destPath
+            + " is on a different file system than the old location "
+            + srcPath + ". This operation is not supported.");
+        }
 
-          try {
-            if (srcFs.exists(srcPath)) {
-              if (newPartLoc.compareTo(oldPartLoc) != 0 && destFs.exists(destPath)) {
-                throw new InvalidOperationException("New location for this table "
-                  + tbl.getDbName() + "." + tbl.getTableName()
-                  + " already exists : " + destPath);
-              }
-              //if destPath's parent path doesn't exist, we should mkdir it
-              Path destParentPath = destPath.getParent();
-              if (!wh.mkdirs(destParentPath)) {
-                  throw new MetaException("Unable to create path " + destParentPath);
-              }
+        try {
+          if (srcFs.exists(srcPath)) {
+            if (newPartLoc.compareTo(oldPartLoc) != 0 && destFs.exists(destPath)) {
+              throw new InvalidOperationException("New location for this table "
+                + tbl.getDbName() + "." + tbl.getTableName()
+                + " already exists : " + destPath);
+            }
+            //if destPath's parent path doesn't exist, we should mkdir it
+            Path destParentPath = destPath.getParent();
+            if (!wh.mkdirs(destParentPath)) {
+                throw new MetaException("Unable to create path " + destParentPath);
+            }
 
-              boolean clonePart = Optional.ofNullable(environmentContext)
-                  .map(EnvironmentContext::getProperties)
-                  .map(prop -> prop.get(RENAME_PARTITION_MAKE_COPY))
-                  .map(Boolean::parseBoolean)
-                  .orElse(false);
-              long writeId = new_part.getWriteId();
+            boolean clonePart = Optional.ofNullable(environmentContext)
+                .map(EnvironmentContext::getProperties)
+                .map(prop -> prop.get(RENAME_PARTITION_MAKE_COPY))
+                .map(Boolean::parseBoolean)
+                .orElse(false);
+            long writeId = new_part.getWriteId();
 
-              if (writeId > 0 && clonePart) {
-                LOG.debug("Making a copy of the partition directory: {} under a new location: {}", srcPath, destPath);
+            if (writeId > 0 && clonePart) {
+              LOG.debug("Making a copy of the partition directory: {} under a new location: {}", srcPath, destPath);
 
-                if (!wh.copyDir(srcPath, destPath, ReplChangeManager.shouldEnableCm(db, tbl))) {
-                  LOG.error("Copy failed for source: " + srcPath + " to destination: " + destPath);
-                  throw new IOException("File copy failed.");
-                }
-                addTruncateBaseFile(srcPath, writeId, conf, DataFormat.DROPPED);
-              } else {
-                //rename the data directory
-                wh.renameDir(srcPath, destPath, ReplChangeManager.shouldEnableCm(db, tbl));
-              }
-              LOG.info("Partition directory rename from " + srcPath + " to " + destPath + " done.");
-              dataWasMoved = true;
-            }
-          } catch (IOException e) {
-            LOG.error("Cannot rename partition directory from " + srcPath + " to " + destPath, e);
-            throw new InvalidOperationException("Unable to access src or dest location for partition "
-                + tbl.getDbName() + "." + tbl.getTableName() + " " + new_part.getValues());
-          } catch (MetaException me) {
-            LOG.error("Cannot rename partition directory from " + srcPath + " to " + destPath, me);
-            throw me;
-          }
-          new_part.getSd().setLocation(newPartLoc);
-        }
+              if (!wh.copyDir(srcPath, destPath, ReplChangeManager.shouldEnableCm(db, tbl))) {
+                LOG.error("Copy failed for source: " + srcPath + " to destination: " + destPath);
+                throw new IOException("File copy failed.");
+              }
+              addTruncateBaseFile(srcPath, writeId, conf, DataFormat.DROPPED);
+            } else {
+              //rename the data directory
+              wh.renameDir(srcPath, destPath, ReplChangeManager.shouldEnableCm(db, tbl));
+            }
+            LOG.info("Partition directory rename from " + srcPath + " to " + destPath + " done.");
+            dataWasMoved = true;
+          }
+        } catch (IOException e) {
+          LOG.error("Cannot rename partition directory from " + srcPath + " to " + destPath, e);
+          throw new InvalidOperationException("Unable to access src or dest location for partition "
+              + tbl.getDbName() + "." + tbl.getTableName() + " " + new_part.getValues());
+        } catch (MetaException me) {
+          LOG.error("Cannot rename partition directory from " + srcPath + " to " + destPath, me);
+          throw me;
+        }
+        new_part.getSd().setLocation(newPartLoc);
       } else {
         new_part.getSd().setLocation(oldPart.getSd().getLocation());
       }
@@ -798,10 +790,7 @@
             if (destFs.exists(destPath)) {
               wh.renameDir(destPath, srcPath, false);
             }
-          } catch (MetaException me) {
-            LOG.error("Failed to restore partition data from " + destPath + " to " + srcPath
-                +  " in alter partition failure. Manual restore is needed.");
-          } catch (IOException ioe) {
+          } catch (MetaException | IOException me) {
             LOG.error("Failed to restore partition data from " + destPath + " to " + srcPath
                 +  " in alter partition failure. Manual restore is needed.");
           }

String databaseName = dbname;
String tableName = name;
Table finalOldt = oldt;
int partitionBatchSize = MetastoreConf.getIntVar(handler.getConf(),
MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
Batchable.runBatched(partitionBatchSize, parts, new Batchable<Partition, Void>() {
@Override
public List<Void> run(List<Partition> input) throws Exception {
List<Partition> oldParts = new ArrayList<>(input.size());
List<List<String>> partVals = input.stream().map(Partition::getValues).collect(Collectors.toList());
// update changed properties (stats)
oldPart.setParameters(part.getParameters());
msdb.alterPartition(catName, dbname, name, part.getValues(), oldPart, writeIdList);
for (Partition part : input) {
Partition oldPart = new Partition(part);
List<FieldSchema> oldCols = part.getSd().getCols();
part.getSd().setCols(newt.getSd().getCols());
List<ColumnStatistics> colStats = updateOrGetPartitionColumnStats(msdb, catalogName, databaseName,
tableName, part.getValues(), oldCols, finalOldt, part, null, null);
assert (colStats.isEmpty());
if (!cascade) {
oldPart.setParameters(part.getParameters());
oldParts.add(oldPart);
}
}
Deadline.checkTimeout();
if (cascade) {
msdb.alterPartitions(catalogName, databaseName, tableName, partVals, input, newt.getWriteId(),
writeIdList);
} else {
msdb.alterPartitions(catalogName, newDbName, newTblName, partVals, oldParts, newt.getWriteId(),
writeIdList);
}
return Collections.emptyList();
}
}
});
} else {
// clear all column stats to prevent incorract behaviour in case same column is reintroduced
TableName tableName = new TableName(catName, dbname, name);
Expand Down
Loading