Skip to content

Commit

Permalink
[HUDI-8648] Fix a bug for secondary index deletion (#12447)
Browse files Browse the repository at this point in the history
  • Loading branch information
linliu-code authored Dec 28, 2024
1 parent 4befcdd commit 66a9401
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,15 @@ private HoodieData<HoodieRecord> getSecondaryIndexUpdates(HoodieCommitMetadata c
getEngineType(),
indexDefinition)
.union(deletedRecords)
.distinctWithKey(HoodieRecord::getKey, parallelism);
.mapToPair(i -> Pair.of(i.getRecordKey(), i))
.reduceByKey((value1, value2) -> {
if (((HoodieMetadataPayload) value1.getData()).isDeleted()) {
return value2;
} else {
return value1;
}
}, parallelism)
.values();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
import org.apache.hudi.metadata.SecondaryIndexKeyUtils;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieSparkTable;
Expand All @@ -58,6 +59,7 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -68,6 +70,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -95,6 +98,8 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {

Expand Down Expand Up @@ -364,6 +369,94 @@ public void testRepeatedCleanActionsWithMetadataTableEnabled(final HoodieTableTy
validateFilesAfterCleaning(deleteFileList, fileSetBeforeCleaning, fileSetAfterSecondCleaning);
}

@Test
void testReverseLookupSecondaryKeysInternalWithOnlyBaseFileRecord() {
String recordKey = "recordKey";
Set<String> recordKeys = Collections.singleton(recordKey);
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords = new HashMap<>();
HoodieMetadataLogRecordReader logRecordReader = mock(HoodieMetadataLogRecordReader.class);
List<HoodieRecord<HoodieMetadataPayload>> logRecords = new ArrayList<>();
when(logRecordReader.getRecords()).thenReturn(logRecords);

String secondaryKey = "secondaryKey";
HoodieRecord<HoodieMetadataPayload> secondaryIndexRecord = HoodieMetadataPayload.createSecondaryIndexRecord(
recordKey, secondaryKey, "partitionPath", false);
baseFileRecords.put(recordKey, secondaryIndexRecord);
Map<String, String> r =
HoodieBackedTableMetadata.reverseLookupSecondaryKeysInternal(recordKeys, baseFileRecords, logRecordReader);
assertEquals(1, r.size());
assertTrue(r.containsKey(recordKey));
assertEquals(secondaryKey, r.get(recordKey));
}

@Test
void testReverseLookupSecondaryKeysInternalWithOnlyLogRecords() {
String recordKey = "recordKey";
Set<String> recordKeys = Collections.singleton(recordKey);
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords = new HashMap<>();
HoodieMetadataLogRecordReader logRecordReader = mock(HoodieMetadataLogRecordReader.class);
List<HoodieRecord<HoodieMetadataPayload>> logRecords = new ArrayList<>();
when(logRecordReader.getRecords()).thenReturn(logRecords);

// Case 1: A single log record.
String secondaryKey = "secondaryKey";
logRecords.add(HoodieMetadataPayload.createSecondaryIndexRecord(
recordKey, secondaryKey, "partitionPath", false));

Map<String, String> r =
HoodieBackedTableMetadata.reverseLookupSecondaryKeysInternal(recordKeys, baseFileRecords, logRecordReader);
assertEquals(1, r.size());
assertTrue(r.containsKey(recordKey));
assertEquals(secondaryKey, r.get(recordKey));

// Case 2: Multiple log records, and the latest record is a tombstone.
logRecords.add(HoodieMetadataPayload.createSecondaryIndexRecord(
recordKey, secondaryKey + "_new", "partitionPath", true));
r = HoodieBackedTableMetadata.reverseLookupSecondaryKeysInternal(recordKeys, baseFileRecords, logRecordReader);
assertEquals(1, r.size());

// Case 3: Multiple log records, and the latest record is not a tombstone.
String newSecondaryKey = "newSecondaryKey";
String newRecordKey = "newRecordKey";
logRecords.add(HoodieMetadataPayload.createSecondaryIndexRecord(
newRecordKey, newSecondaryKey, "partitionPath", false));
r = HoodieBackedTableMetadata.reverseLookupSecondaryKeysInternal(recordKeys, baseFileRecords, logRecordReader);
assertEquals(1, r.size());
assertFalse(r.containsKey(newRecordKey));
assertEquals(secondaryKey, r.get(recordKey));
}

@Test
void testReverseLookupSecondaryKeysInternal() {
String recordKey = "recordKey";
Set<String> recordKeys = Collections.singleton(recordKey);
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords = new HashMap<>();
HoodieMetadataLogRecordReader logRecordReader = mock(HoodieMetadataLogRecordReader.class);
List<HoodieRecord<HoodieMetadataPayload>> logRecords = new ArrayList<>();
when(logRecordReader.getRecords()).thenReturn(logRecords);

// Case 1: Latest log record is a tombstone.
String secondaryKey = "secondaryKey";
logRecords.add(HoodieMetadataPayload.createSecondaryIndexRecord(
recordKey, secondaryKey, "partitionPath", true));
baseFileRecords.put(
SecondaryIndexKeyUtils.constructSecondaryIndexKey(secondaryKey, recordKey),
HoodieMetadataPayload.createSecondaryIndexRecord(
recordKey, secondaryKey, "partitionPath", false));
Map<String, String> r =
HoodieBackedTableMetadata.reverseLookupSecondaryKeysInternal(recordKeys, baseFileRecords, logRecordReader);
assertEquals(0, r.size());

// Case 2: Latest log record is not a tombstone.
String newSecondaryKey = "newSecondaryKey";
logRecords.add(HoodieMetadataPayload.createSecondaryIndexRecord(
recordKey, newSecondaryKey, "partitionPath", false));
r = HoodieBackedTableMetadata.reverseLookupSecondaryKeysInternal(recordKeys, baseFileRecords, logRecordReader);
assertEquals(1, r.size());
assertTrue(r.containsKey(recordKey));
assertEquals(newSecondaryKey, r.get(recordKey));
}

private int getNumCompactions(HoodieTableMetaClient metaClient) {
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
return timeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -72,7 +73,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -826,59 +826,24 @@ protected Map<String, String> getSecondaryKeysForRecordKeys(List<String> recordK
}

private Map<String, String> reverseLookupSecondaryKeys(String partitionName, List<String> recordKeys, FileSlice fileSlice) {
Map<String, String> recordKeyMap = new HashMap<>();
Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers = getOrCreateReaders(partitionName, fileSlice);
try {
HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
if (baseFileReader == null && logRecordScanner == null) {
return Collections.emptyMap();
}

Set<String> keySet = new TreeSet<>(recordKeys);
Set<String> deletedRecordsFromLogs = new HashSet<>();
// Map of recordKey (primaryKey) -> log record that is not deleted for all input recordKeys
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new HashMap<>();
logRecordScanner.getRecords().forEach(record -> {
String recordKey = SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
HoodieMetadataPayload payload = record.getData();
if (!payload.isDeleted()) { // process only valid records.
if (keySet.contains(recordKey)) {
logRecordsMap.put(recordKey, record);
}
} else {
deletedRecordsFromLogs.add(recordKey);
}
});

// Map of (record-key, secondary-index-record)
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords = fetchBaseFileAllRecordsByPayloadForSecIndex(baseFileReader, keySet, partitionName);
if (baseFileRecords == null || baseFileRecords.isEmpty()) {
logRecordsMap.forEach((key1, value1) -> {
if (!value1.getData().isDeleted()) {
recordKeyMap.put(key1, SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value1.getRecordKey()));
}
});
} else {
// Iterate over all provided log-records, merging them into existing records
logRecordsMap.forEach((key1, value1) -> baseFileRecords.merge(key1, value1, (oldRecord, newRecord) -> {
Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord = HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord, newRecord);
return mergedRecord.orElse(null);
}));
baseFileRecords.forEach((key, value) -> {
if (!deletedRecordsFromLogs.contains(key)) {
recordKeyMap.put(key, SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value.getRecordKey()));
}
});
}
Set<String> keySet = new HashSet<>(recordKeys);
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords =
fetchBaseFileAllRecordsByPayloadForSecIndex(baseFileReader, keySet, partitionName);
return reverseLookupSecondaryKeysInternal(keySet, baseFileRecords, logRecordScanner);
} catch (IOException ioe) {
throw new HoodieIOException("Error merging records from metadata table for " + recordKeys.size() + " key : ", ioe);
} finally {
if (!reuse) {
closeReader(readers);
}
}
return recordKeyMap;
}

@Override
Expand Down Expand Up @@ -915,8 +880,52 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileAllRecords
return composeRecord(data, partitionName);
}).filter(record -> {
return keySet.contains(SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey()));
}).collect(Collectors.toMap(record -> {
return SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey());
}, record -> record));
}).collect(Collectors.toMap(HoodieRecord::getRecordKey, record -> record));
}

@VisibleForTesting
public static Map<String, String> reverseLookupSecondaryKeysInternal(Set<String> recordKeySet,
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords,
HoodieMetadataLogRecordReader logRecordScanner) {
Set<String> deletedRecordsFromLogs = new HashSet<>();
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new HashMap<>();
// Note that: we read the log records from the oldest to the latest!!!
// If we change the read order, we need update the following logic accordingly.
logRecordScanner.getRecords().forEach(logRecord -> {
String recordKey = SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(logRecord.getRecordKey());
if (recordKeySet.contains(recordKey)) {
HoodieMetadataPayload payload = logRecord.getData();
if (!payload.isDeleted()) { // process only valid records.
logRecordsMap.put(logRecord.getRecordKey(), logRecord);
} else {
deletedRecordsFromLogs.add(logRecord.getRecordKey());
}
}
});

Map<String, String> recordKeyMap = new HashMap<>();
if (baseFileRecords == null || baseFileRecords.isEmpty()) {
logRecordsMap.forEach((key, value) -> {
if (!value.getData().secondaryIndexMetadata.getIsDeleted()) {
recordKeyMap.put(
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(key),
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value.getRecordKey()));
}
});
} else {
// Iterate over all provided log-records, merging them into existing records
logRecordsMap.forEach((key, value) -> baseFileRecords.merge(key, value, (oldRecord, newRecord) -> {
Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord = HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord, newRecord);
return mergedRecord.orElse(null);
}));
baseFileRecords.forEach((key, value) -> {
if (!deletedRecordsFromLogs.contains(key)) {
recordKeyMap.put(
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(value.getRecordKey()),
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(value.getRecordKey()));
}
});
}
return recordKeyMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static String getSecondaryKeyFromSecondaryIndexKey(String key) {
return unescapeSpecialChars(key.substring(0, delimiterIndex));
}

static String constructSecondaryIndexKey(String secondaryKey, String recordKey) {
public static String constructSecondaryIndexKey(String secondaryKey, String recordKey) {
return escapeSpecialChars(secondaryKey) + SECONDARY_INDEX_RECORD_KEY_SEPARATOR + escapeSpecialChars(recordKey);
}

Expand Down

0 comments on commit 66a9401

Please sign in to comment.