Skip to content

Commit b95efa4

Browse files
committed
HIVE-28976 : Enhance Commit message in notification_log
to correctly filter events during incremental replication Details: * Added write ID, database name into commit & abort message * Updated filter in CommitTxnHandler to utilise it during replication dump process Testing: * Added test cases * Tested on live cluster
1 parent 2fedc86 commit b95efa4

File tree

17 files changed

+405
-48
lines changed

17 files changed

+405
-48
lines changed

hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,7 @@ public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGen
668668
return;
669669
}
670670
CommitTxnMessage msg =
671-
MessageBuilder.getInstance().buildCommitTxnMessage(commitTxnEvent.getTxnId());
671+
MessageBuilder.getInstance().buildCommitTxnMessage(commitTxnEvent.getTxnId(), commitTxnEvent.getDatabases(), commitTxnEvent.getWriteId());
672672

673673
NotificationEvent event =
674674
new NotificationEvent(0, now(), EventType.COMMIT_TXN.toString(),
@@ -688,7 +688,7 @@ public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, SQLGenera
688688
return;
689689
}
690690
AbortTxnMessage msg =
691-
MessageBuilder.getInstance().buildAbortTxnMessage(abortTxnEvent.getTxnId(), abortTxnEvent.getDbsUpdated());
691+
MessageBuilder.getInstance().buildAbortTxnMessage(abortTxnEvent.getTxnId(), abortTxnEvent.getDbsUpdated(), abortTxnEvent.getWriteId());
692692
NotificationEvent event =
693693
new NotificationEvent(0, now(), EventType.ABORT_TXN.toString(),
694694
msgEncoder.getSerializer().serialize(msg));

itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationFilterTransactions.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -272,10 +272,10 @@ public void setup() throws Throwable {
272272
PrimaryEventListenerTestImpl.reset();
273273
ReplicaEventListenerTestImpl.reset();
274274

275-
// Each test always has 8 openTxns, 6 commitTxn, and 2 abortTxns.
275+
// Each test always has 9 openTxns, 7 commitTxn, and 2 abortTxns.
276276
// Note that this is the number that was done on the primary,
277277
// and some are done on non-replicated database.
278-
expected = new EventCount(8, 6, 2);
278+
expected = new EventCount(9, 7, 2);
279279
}
280280

281281
static void updateTxnMapping(Map<Long, Long> map) throws Exception {
@@ -323,8 +323,11 @@ private void prepareBootstrapData() throws Throwable {
323323
.run("insert into t999 values (99908)")
324324
.run("insert into t999 values (99909)")
325325
.run("insert into t999 values (99910)")
326-
.run("drop table t999");
327-
txnOffset = 10;
326+
.run("drop table t999")
327+
.run("create table t10 (id int) clustered by(id) into 3 buckets stored as orc " +
328+
"tblproperties (\"transactional\"=\"true\")")
329+
.run("insert into t10 values (10)");
330+
txnOffset = 11;
328331

329332
// primaryDbName is replicated, t2 and t2 are ACID tables with initial data.
330333
// t3 is an ACID table with 2 initial rows, later t3 will be locked to force aborted transaction.
@@ -400,7 +403,8 @@ private void prepareIncrementalData() throws Throwable {
400403
primary.run("use " + primaryDbName)
401404
.run("insert into t1 values (2), (3)")
402405
.run("insert into t2 partition(country='india') values ('chennai')")
403-
.run("insert into t2 partition(country='india') values ('pune')");
406+
.run("insert into t2 partition(country='india') values ('pune')")
407+
.run("truncate table t10");
404408
prepareAbortTxn(primaryDbName, 222);
405409
primary.run("use " + otherDbName)
406410
.run("insert into t1 values (200), (300)")
@@ -481,14 +485,14 @@ private void assertTxnOptimization(boolean optimizationOn, WarehouseInstance.Tup
481485

482486
// Assert the number of Txn events that occurred on the replica.
483487
// When optimization is on, filtered has the number of Txn events that are expected to have been filtered.
484-
// When optimization is off, filtered should be all all 0s.
488+
// When optimization is off, filtered should be all 0s.
485489
Assert.assertEquals(expected.getCountOpenTxn() - filtered.getCountOpenTxn(), ReplicaEventListenerTestImpl.getCountOpenTxn());
486490
Assert.assertEquals(expected.getCountCommitTxn() - filtered.getCountCommitTxn(), ReplicaEventListenerTestImpl.getCountCommitTxn());
487491
Assert.assertEquals(expected.getCountAbortTxn() - filtered.getCountAbortTxn(), ReplicaEventListenerTestImpl.getCountAbortTxn());
488492

489493
// Assert the number of Txn event files found.
490494
// When optimization is on, filtered has the number of Txn events that are expected to have been filtered.
491-
// When optimization is off, filtered should be all all 0s.
495+
// When optimization is off, filtered should be all 0s.
492496
// Note that when optimization is on, there should never be optnTxn events.
493497
Assert.assertEquals(optimizationOn ? 0 : expected.getCountOpenTxn(), openTxns.size());
494498
Assert.assertEquals(expected.getCountCommitTxn() - filtered.getCountCommitTxn(), commitTxns.size());
@@ -501,5 +505,9 @@ private void assertTxnOptimization(boolean optimizationOn, WarehouseInstance.Tup
501505
for (Map.Entry<Long, Long> mapping : replicaTxnMapping.entrySet()) {
502506
Assert.assertEquals(mapping.getKey().longValue() - txnOffset, mapping.getValue().longValue());
503507
}
508+
Map<Long, Long> postReplicationReplTxnMap = new HashMap<>();
509+
// In both the cases, the post replication REPL_TXN_MAP should be empty.
510+
TestReplicationFilterTransactions.updateTxnMapping(postReplicationReplTxnMap);
511+
Assert.assertEquals(0, postReplicationReplTxnMap.size());
504512
}
505513
}

ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.util.ArrayList;
3232
import java.util.List;
33+
import java.util.stream.Collectors;
3334

3435
class AbortTxnHandler extends AbstractEventHandler<AbortTxnMessage> {
3536

@@ -50,8 +51,12 @@ public void handle(Context withinContext) throws Exception {
5051

5152
if (ReplUtils.filterTransactionOperations(withinContext.hiveConf)) {
5253
String contextDbName = StringUtils.normalizeIdentifier(withinContext.replScope.getDbName());
53-
JSONAbortTxnMessage abortMsg = (JSONAbortTxnMessage)eventMessage;
54-
if ((abortMsg.getDbsUpdated() == null) || !abortMsg.getDbsUpdated().contains(contextDbName)) {
54+
List<Long> writeIds = eventMessage.getWriteIds();
55+
List<String> dbsUpdated = eventMessage.getDbsUpdated()
56+
.stream()
57+
.map(StringUtils::normalizeIdentifier)
58+
.collect(Collectors.toList());
59+
if ((writeIds == null || writeIds.isEmpty() || !dbsUpdated.contains(contextDbName))) {
5560
LOG.info("Filter out #{} ABORT_TXN message : {}", fromEventId(), eventMessageAsJSON);
5661
return;
5762
}

ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
2828
import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
2929
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
30+
import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
3031
import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
3132
import org.apache.hadoop.hive.metastore.utils.StringUtils;
3233
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
@@ -43,7 +44,9 @@
4344
import java.io.File;
4445
import java.io.IOException;
4546
import java.util.ArrayList;
47+
import java.util.Collections;
4648
import java.util.List;
49+
import java.util.Optional;
4750
import java.util.stream.Collectors;
4851

4952
class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> {
@@ -173,20 +176,48 @@ public void handle(Context withinContext) throws Exception {
173176
}
174177

175178
List<WriteEventInfo> writeEventInfoList = null;
179+
List<WriteEventInfo> allWriteEventInfoExceptMV = null;
176180
if (replicatingAcidEvents) {
177181
writeEventInfoList = getAllWriteEventInfo(withinContext);
178182

179-
if (ReplUtils.filterTransactionOperations(withinContext.hiveConf)
180-
&& (writeEventInfoList == null || writeEventInfoList.size() == 0)) {
181-
// If optimizing transactions, no need to dump this one
182-
// if there were no write events.
183-
return;
183+
if (writeEventInfoList != null) {
184+
allWriteEventInfoExceptMV = getAllWriteEventInfoExceptMV(writeEventInfoList);
185+
}
186+
String dbName = StringUtils.normalizeIdentifier(withinContext.replScope.getDbName());
187+
188+
if (ReplUtils.filterTransactionOperations(withinContext.hiveConf)) {
189+
List<Long> writeIds = eventMessage.getWriteIds();
190+
List<String> databases = Optional.ofNullable(eventMessage.getDatabases())
191+
.orElse(Collections.emptyList())
192+
.stream()
193+
.map(StringUtils::normalizeIdentifier)
194+
.collect(Collectors.toList());
195+
196+
// Truth Table
197+
// Operation | writeIds | writeEventInfoList | databases | allWriteEventInfoExceptMV | Output
198+
// Read | null | null | null | same as writeEventInfoList | Skip
199+
// Insert | not null | not null | not null | same | Dump
200+
// Truncate | not null | null | not null | same | Dump
201+
// Materialized view | not null | not null | not null | different | Skip
202+
203+
boolean shouldSkip = (writeIds == null || writeIds.isEmpty() || !databases.contains(dbName));
204+
if (writeEventInfoList != null && !writeEventInfoList.isEmpty()) {
205+
shouldSkip = writeEventInfoList.size() != allWriteEventInfoExceptMV.size();
206+
}
207+
208+
if (shouldSkip) {
209+
// If optimizing transactions, no need to dump this one
210+
// if there were no write events.
211+
LOG.debug("skipping commit txn event for db: {}, writeIds: {}, writeEventInfoList: {}, databases: {}",
212+
dbName, writeIds, writeEventInfoList, databases);
213+
return;
214+
}
184215
}
185216
}
186217

187-
// Filtering out all write event infos related to materialized view
218+
// Filtering out all write event info related to materialized view
188219
if (writeEventInfoList != null) {
189-
writeEventInfoList = getAllWriteEventInfoExceptMV(writeEventInfoList);
220+
writeEventInfoList = allWriteEventInfoExceptMV;
190221
}
191222
int numEntry = (writeEventInfoList != null ? writeEventInfoList.size() : 0);
192223
if (numEntry != 0) {

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AbortTxnEvent.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,29 +37,32 @@ public class AbortTxnEvent extends ListenerEvent {
3737
private final Long txnId;
3838
private final TxnType txnType;
3939
private final List<String> dbsUpdated;
40+
private final List<Long> writeId;
4041

4142
public AbortTxnEvent(Long transactionId, IHMSHandler handler) {
42-
this(transactionId, null, handler, null);
43+
this(transactionId, null, handler, null, null);
4344
}
4445

4546
public AbortTxnEvent(Long transactionId, TxnType txnType) {
46-
this(transactionId, txnType, null, null);
47+
this(transactionId, txnType, null, null, null);
4748
}
4849

4950
/**
5051
* @param transactionId Unique identification for the transaction that got rolledback.
5152
* @param txnType type of transaction
5253
* @param handler handler that is firing the event
5354
* @param dbsUpdated list of databases that had update events
55+
* @param writeId write id for transaction
5456
*/
55-
public AbortTxnEvent(Long transactionId, TxnType txnType, IHMSHandler handler, List<String> dbsUpdated) {
57+
public AbortTxnEvent(Long transactionId, TxnType txnType, IHMSHandler handler, List<String> dbsUpdated, List<Long> writeId) {
5658
super(true, handler);
5759
this.txnId = transactionId;
5860
this.txnType = txnType;
5961
this.dbsUpdated = new ArrayList<String>();
6062
if (dbsUpdated != null) {
61-
this.dbsUpdated.addAll(dbsUpdated);;
63+
this.dbsUpdated.addAll(dbsUpdated);
6264
}
65+
this.writeId = writeId == null ? new ArrayList<>() : writeId;
6366
}
6467

6568
/**
@@ -83,4 +86,11 @@ public TxnType getTxnType() {
8386
public List<String> getDbsUpdated() {
8487
return dbsUpdated;
8588
}
89+
90+
/**
91+
* @return List of write ids which are associated with abort txn
92+
*/
93+
public List<Long> getWriteId() {
94+
return writeId;
95+
}
8696
}

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CommitTxnEvent.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.hadoop.hive.metastore.IHMSHandler;
2424
import org.apache.hadoop.hive.metastore.api.TxnType;
2525

26+
import java.util.List;
27+
2628
/**
2729
* CommitTxnEvent
2830
* Event generated for commit transaction operation
@@ -33,24 +35,30 @@ public class CommitTxnEvent extends ListenerEvent {
3335

3436
private final Long txnId;
3537
private final TxnType txnType;
38+
private final List<Long> writeId;
39+
private final List<String> databases;
3640

3741
public CommitTxnEvent(Long transactionId, IHMSHandler handler) {
38-
this(transactionId, null, handler);
42+
this(transactionId, null, handler, null, null);
3943
}
4044

4145
public CommitTxnEvent(Long transactionId, TxnType txnType) {
42-
this(transactionId, txnType, null);
46+
this(transactionId, txnType, null, null, null);
4347
}
4448

4549
/**
4650
* @param transactionId Unique identification for the transaction just got committed.
4751
* @param txnType type of transaction
4852
* @param handler handler that is firing the event
53+
* @param databases list of databases for which commit txn event is fired
54+
* @param writeId write id for transaction
4955
*/
50-
public CommitTxnEvent(Long transactionId, TxnType txnType, IHMSHandler handler) {
56+
public CommitTxnEvent(Long transactionId, TxnType txnType, IHMSHandler handler, List<String> databases, List<Long> writeId) {
5157
super(true, handler);
5258
this.txnId = transactionId;
5359
this.txnType = txnType;
60+
this.writeId = writeId;
61+
this.databases = databases;
5462
}
5563

5664
/**
@@ -66,4 +74,18 @@ public Long getTxnId() {
6674
public TxnType getTxnType() {
6775
return txnType;
6876
}
77+
78+
/**
79+
* @return List of write ids for which commit txn event is fired
80+
*/
81+
public List<Long> getWriteId() {
82+
return writeId;
83+
}
84+
85+
/**
86+
* @return List of databases for which commit txn event is fired
87+
*/
88+
public List<String> getDatabases() {
89+
return databases;
90+
}
6991
}

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/AbortTxnMessage.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,6 @@ protected AbortTxnMessage() {
3636
public abstract Long getTxnId();
3737

3838
public abstract List<String> getDbsUpdated();
39+
40+
public abstract List<Long> getWriteIds();
3941
}

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -303,12 +303,12 @@ public OpenTxnMessage buildOpenTxnMessage(Long fromTxnId, Long toTxnId) {
303303
return new JSONOpenTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fromTxnId, toTxnId, now());
304304
}
305305

306-
public CommitTxnMessage buildCommitTxnMessage(Long txnId) {
307-
return new JSONCommitTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now());
306+
public CommitTxnMessage buildCommitTxnMessage(Long txnId, List<String> databases, List<Long> writeIds) {
307+
return new JSONCommitTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now(), databases, writeIds);
308308
}
309309

310-
public AbortTxnMessage buildAbortTxnMessage(Long txnId, List<String> dbsUpdated) {
311-
return new JSONAbortTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now(), dbsUpdated);
310+
public AbortTxnMessage buildAbortTxnMessage(Long txnId, List<String> dbsUpdated, List<Long> writeIds) {
311+
return new JSONAbortTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now(), dbsUpdated, writeIds);
312312
}
313313

314314
public AllocWriteIdMessage buildAllocWriteIdMessage(List<TxnToWriteId> txnToWriteIdList,

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAbortTxnMessage.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,22 @@ public class JSONAbortTxnMessage extends AbortTxnMessage {
4343

4444
@JsonProperty
4545
private List<String> dbsUpdated;
46+
@JsonProperty
47+
private List<Long> writeIds;
4648

4749
/**
4850
* Default constructor, needed for Jackson.
4951
*/
5052
public JSONAbortTxnMessage() {
5153
}
5254

53-
public JSONAbortTxnMessage(String server, String servicePrincipal, Long txnid, Long timestamp, List<String> dbsUpdated) {
55+
public JSONAbortTxnMessage(String server, String servicePrincipal, Long txnid, Long timestamp, List<String> dbsUpdated, List<Long> writeIds) {
5456
this.timestamp = timestamp;
5557
this.txnid = txnid;
5658
this.server = server;
5759
this.servicePrincipal = servicePrincipal;
5860
this.dbsUpdated = dbsUpdated;
61+
this.writeIds = writeIds;
5962
}
6063

6164
@Override
@@ -88,6 +91,11 @@ public List<String> getDbsUpdated() {
8891
return dbsUpdated;
8992
}
9093

94+
@Override
95+
public List<Long> getWriteIds() {
96+
return writeIds;
97+
}
98+
9199
@Override
92100
public String toString() {
93101
try {

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ public JSONCommitTxnMessage(String server, String servicePrincipal, Long txnid,
7272
this.files = null;
7373
}
7474

75+
public JSONCommitTxnMessage(String server, String servicePrincipal, Long txnid, Long timestamp, List<String> databases, List<Long> writeIds) {
76+
this(server, servicePrincipal, txnid, timestamp);
77+
this.databases = databases;
78+
this.writeIds = writeIds;
79+
}
80+
7581
@Override
7682
public Long getTxnId() {
7783
return txnid;

0 commit comments

Comments
 (0)