Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,23 +139,53 @@ Pair<Pair<Integer, Long>, Boolean> processKeyDeletes(Map<String, PurgedKey> keyB
String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException {
long startTime = Time.monotonicNow();
Pair<Pair<Integer, Long>, Boolean> purgeResult = Pair.of(Pair.of(0, 0L), false);

// Filter out empty files (files with no blocks) before sending to SCM
Map<String, PurgedKey> nonEmptyKeyBlocksList = keyBlocksList.entrySet().stream()
.filter(entry -> entry.getValue().getBlockGroup() != null &&
entry.getValue().getBlockGroup().getDeletedBlocks() != null &&
!entry.getValue().getBlockGroup().getDeletedBlocks().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (LOG.isDebugEnabled()) {
LOG.debug("Send {} key(s) to SCM: {}",
keyBlocksList.size(), keyBlocksList);
LOG.debug("Send {} key(s) to SCM (filtered {} empty files): {}",
nonEmptyKeyBlocksList.size(), keyBlocksList.size() - nonEmptyKeyBlocksList.size(), nonEmptyKeyBlocksList);
} else if (LOG.isInfoEnabled()) {
int logSize = 10;
if (keyBlocksList.size() < logSize) {
logSize = keyBlocksList.size();
if (nonEmptyKeyBlocksList.size() < logSize) {
logSize = nonEmptyKeyBlocksList.size();
}
LOG.info("Send {} key(s) to SCM, first {} keys: {}",
keyBlocksList.size(), logSize, keyBlocksList.entrySet().stream().limit(logSize)
LOG.info("Send {} key(s) to SCM (filtered {} empty files), first {} keys: {}",
nonEmptyKeyBlocksList.size(), keyBlocksList.size() - nonEmptyKeyBlocksList.size(), logSize,
nonEmptyKeyBlocksList.entrySet().stream().limit(logSize)
.map(Map.Entry::getValue).collect(Collectors.toSet()));
}
List<DeleteBlockGroupResult> blockDeletionResults =
scmClient.deleteKeyBlocks(keyBlocksList.values().stream()
.map(PurgedKey::getBlockGroup).collect(Collectors.toList()));
LOG.info("{} BlockGroup deletion are acked by SCM in {} ms",
keyBlocksList.size(), Time.monotonicNow() - startTime);
List<DeleteBlockGroupResult> blockDeletionResults;
if (nonEmptyKeyBlocksList.isEmpty()) {
// Skip SCM call if all files are empty
blockDeletionResults = new ArrayList<>();
LOG.info("Skipping SCM call as all {} files are empty", keyBlocksList.size());
} else {
blockDeletionResults =
scmClient.deleteKeyBlocks(nonEmptyKeyBlocksList.values().stream()
.map(PurgedKey::getBlockGroup).collect(Collectors.toList()));
}

// Add successful results for empty files (no need to send to SCM)
Map<String, PurgedKey> emptyKeyBlocksList = keyBlocksList.entrySet().stream()
.filter(entry -> !nonEmptyKeyBlocksList.containsKey(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

for (PurgedKey emptyKey : emptyKeyBlocksList.values()) {
// Create a successful result for empty files
DeleteBlockGroupResult emptyFileResult = new DeleteBlockGroupResult(
emptyKey.getBlockGroup().getGroupID(), new ArrayList<>());
blockDeletionResults.add(emptyFileResult);
}

LOG.info("{} BlockGroup deletion are acked by SCM in {} ms ({} non-empty, {} empty)",
keyBlocksList.size(), Time.monotonicNow() - startTime,
nonEmptyKeyBlocksList.size(), emptyKeyBlocksList.size());
if (blockDeletionResults != null) {
long purgeStartTime = Time.monotonicNow();
purgeResult = submitPurgeKeysRequest(blockDeletionResults, keyBlocksList, keysToModify, renameEntries,
Expand Down
Loading