Skip to content

HIVE-28988: Handle materialized view commit txn during hive acid repl #5842

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3197,10 +3197,13 @@ public void testMaterializedViewOnAcidTableReplication() throws Throwable {
.run("insert into t1 values (1)")
.run("insert into t1 values (2)")
.run("create materialized view mat_view as select * from t1")
.run("create materialized view mat_view_txn TBLPROPERTIES ('transactional'='true') as select * from t1")
.run("show tables")
.verifyResults(new String[]{"t1", "mat_view"})
.verifyResults(new String[]{"t1", "mat_view", "mat_view_txn"})
.run("select * from mat_view")
.verifyResults(new String[]{"1", "2"})
.run("select * from mat_view_txn")
.verifyResults(new String[]{"1", "2"})
.dump(primaryDbName);

//confirm materialized-view not replicated
Expand All @@ -3217,6 +3220,9 @@ public void testMaterializedViewOnAcidTableReplication() throws Throwable {
.run("alter materialized view mat_view rebuild")
.run("select * from mat_view")
.verifyResults(new String[]{"1", "2", "3"})
.run("alter materialized view mat_view_txn rebuild")
.run("select * from mat_view_txn")
.verifyResults(new String[]{"1", "2", "3"})
.dump(primaryDbName);

//confirm materialized-view not replicated
Expand All @@ -3232,6 +3238,9 @@ public void testMaterializedViewOnAcidTableReplication() throws Throwable {
.run("alter materialized view mat_view disable rewrite")
.run("select * from mat_view")
.verifyResults(new String[]{"1", "2", "3"})
.run("alter materialized view mat_view_txn disable rewrite")
.run("select * from mat_view_txn")
.verifyResults(new String[]{"1", "2", "3"})
.dump(primaryDbName);

//confirm materialized-view not replicated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
Expand All @@ -43,6 +44,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> {

Expand Down Expand Up @@ -128,6 +130,20 @@ private List<WriteEventInfo> getAllWriteEventInfo(Context withinContext) throws
})));
}

private List<WriteEventInfo> getAllWriteEventInfoExceptMV(List<WriteEventInfo> writeEventInfoList) {
Copy link
Member

@deniskuzZ deniskuzZ Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we even need to store the MV write events and do the filter later, as done here?

return writeEventInfoList.stream().filter(writeEventInfo -> {
try {
if (writeEventInfo.getTableObj() != null) {
Table table = new Table((org.apache.hadoop.hive.metastore.api.Table) MessageBuilder.getTObj(writeEventInfo.getTableObj(), org.apache.hadoop.hive.metastore.api.Table.class));
return !table.isMaterializedView();
}
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
}

@Override
public void handle(Context withinContext) throws Exception {
if (!ReplUtils.includeAcidTableInDump(withinContext.hiveConf)) {
Expand Down Expand Up @@ -168,6 +184,10 @@ public void handle(Context withinContext) throws Exception {
}
}

// Filtering out all write event i related to materialized view
if (writeEventInfoList != null) {
writeEventInfoList = getAllWriteEventInfoExceptMV(writeEventInfoList);
}
int numEntry = (writeEventInfoList != null ? writeEventInfoList.size() : 0);
if (numEntry != 0) {
eventMessage.addWriteEventInfo(writeEventInfoList);
Expand Down
Loading