Skip to content

[HUDI-9421] Update restore/rollback/indexing planning and instant generation #13368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1113,21 +1113,20 @@ private List<String> getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
@Deprecated
public boolean rollback(final String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo,
boolean skipLocking, boolean skipVersionCheck) throws HoodieRollbackException {
final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().requestedTime())
.orElseGet(() -> createNewInstantTime(!skipLocking));
return rollback(commitInstantTime, pendingRollbackInfo, rollbackInstantTime, skipLocking, skipVersionCheck);
return rollback(commitInstantTime, pendingRollbackInfo, Option.empty(), skipLocking, skipVersionCheck);
}

/**
* @param commitInstantTime Instant time of the commit
* @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt.
* @param suppliedRollbackInstantTime the user provided instant to use for the rollback. This is only set for rolling back instants on the metadata table.
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
* @throws HoodieRollbackException if rollback cannot be performed successfully
* @Deprecated Rollback the inflight record changes with the given commit time. This
* will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String, boolean)
*/
@Deprecated
public boolean rollback(final String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, String rollbackInstantTime,
public boolean rollback(final String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, Option<String> suppliedRollbackInstantTime,
boolean skipLocking, boolean skipVersionCheck) throws HoodieRollbackException {
LOG.info("Begin rollback of instant {} for table {}", commitInstantTime, config.getBasePath());
final Timer.Context timerContext = this.metrics.getRollbackCtx();
Expand All @@ -1136,36 +1135,48 @@ public boolean rollback(final String commitInstantTime, Option<HoodiePendingRoll
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(), commitInstantTime))
.findFirst());
if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) {
LOG.info("Scheduling Rollback at instant time : {} "
+ "(exists in active timeline: {}), with rollback plan: {} for table {}",
rollbackInstantTime, commitInstantOpt.isPresent(), pendingRollbackInfo.isPresent(), config.getBasePath());
Option<HoodieRollbackPlan> rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
.orElseGet(() -> table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(),
false));
if (rollbackPlanOption.isPresent()) {
// There can be a case where the inflight rollback failed after the instant files
// are deleted for commitInstantTime, so that commitInstantOpt is empty as it is
// not present in the timeline. In such a case, the hoodie instant instance
// is reconstructed to allow the rollback to be reattempted, and the deleteInstants
// is set to false since they are already deleted.
// Execute rollback
HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
? table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, skipLocking)
: table.rollback(context, rollbackInstantTime, table.getMetaClient().createNewInstant(
HoodieInstant.State.INFLIGHT, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
false, skipLocking);
if (timerContext != null) {
long durationInMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
Option<HoodieRollbackPlan> rollbackPlanOption;
String rollbackInstantTime;
if (pendingRollbackInfo.isPresent()) {
rollbackPlanOption = Option.of(pendingRollbackInfo.get().getRollbackPlan());
rollbackInstantTime = pendingRollbackInfo.get().getRollbackInstant().requestedTime();
} else {
if (commitInstantOpt.isEmpty()) {
LOG.warn("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath());
return false;
}
if (!skipLocking) {
txnManager.beginTransaction(Option.empty(), Option.empty());
}
try {
rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> createNewInstantTime(false));
rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(), false);
} finally {
if (!skipLocking) {
txnManager.endTransaction(Option.empty());
}
return true;
} else {
throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime);
}
}

if (rollbackPlanOption.isPresent()) {
// There can be a case where the inflight rollback failed after the instant files
// are deleted for commitInstantTime, so that commitInstantOpt is empty as it is
// not present in the timeline. In such a case, the hoodie instant instance
// is reconstructed to allow the rollback to be reattempted, and the deleteInstants
// is set to false since they are already deleted.
// Execute rollback
HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
? table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, skipLocking)
: table.rollback(context, rollbackInstantTime, table.getMetaClient().createNewInstant(
HoodieInstant.State.INFLIGHT, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
false, skipLocking);
if (timerContext != null) {
long durationInMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
}
return true;
} else {
LOG.warn("Cannot find instant {} in the timeline of table {} for rollback", commitInstantTime, config.getBasePath());
return false;
throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime);
}
} catch (Exception e) {
throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ public boolean rollback(final String commitInstantTime) throws HoodieRollbackExc
public boolean rollback(final String commitInstantTime, String rollbackInstantTimestamp) throws HoodieRollbackException {
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty());
Option<HoodiePendingRollbackInfo> pendingRollbackInfo = tableServiceClient.getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
return tableServiceClient.rollback(commitInstantTime, pendingRollbackInfo, rollbackInstantTimestamp, false, false);
return tableServiceClient.rollback(commitInstantTime, pendingRollbackInfo, false, false);
}

/**
Expand Down Expand Up @@ -859,8 +859,13 @@ private Pair<String, Option<HoodieRestorePlan>> scheduleAndGetRestorePlan(final
if (failedRestore.isPresent() && savepointToRestoreTimestamp.equals(RestoreUtils.getSavepointToRestoreTimestamp(table, failedRestore.get()))) {
return Pair.of(failedRestore.get().requestedTime(), Option.of(RestoreUtils.getRestorePlan(table.getMetaClient(), failedRestore.get())));
}
final String restoreInstantTimestamp = createNewInstantTime();
return Pair.of(restoreInstantTimestamp, table.scheduleRestore(context, restoreInstantTimestamp, savepointToRestoreTimestamp));
txnManager.beginTransaction(Option.empty(), Option.empty());
try {
final String restoreInstantTimestamp = createNewInstantTime();
return Pair.of(restoreInstantTimestamp, table.scheduleRestore(context, restoreInstantTimestamp, savepointToRestoreTimestamp));
} finally {
txnManager.endTransaction(Option.empty());
}
}

/**
Expand Down Expand Up @@ -1035,10 +1040,15 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String
* @return instant time for the requested INDEX action
*/
public Option<String> scheduleIndexing(List<MetadataPartitionType> partitionTypes, List<String> partitionPaths) {
String instantTime = createNewInstantTime();
Option<HoodieIndexPlan> indexPlan = createTable(config)
.scheduleIndexing(context, instantTime, partitionTypes, partitionPaths);
return indexPlan.isPresent() ? Option.of(instantTime) : Option.empty();
txnManager.beginTransaction(Option.empty(), Option.empty());
try {
String instantTime = createNewInstantTime(false);
Option<HoodieIndexPlan> indexPlan = createTable(config)
.scheduleIndexing(context, instantTime, partitionTypes, partitionPaths);
return indexPlan.map(plan -> instantTime);
} finally {
txnManager.endTransaction(Option.empty());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand Down Expand Up @@ -72,8 +71,6 @@ public class ScheduleIndexActionExecutor<T, I, K, O> extends BaseActionExecutor<

private final List<String> partitionPaths;

private final TransactionManager txnManager;

public ScheduleIndexActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
Expand All @@ -83,7 +80,6 @@ public ScheduleIndexActionExecutor(HoodieEngineContext context,
super(context, config, table, instantTime);
this.partitionIndexTypes = partitionIndexTypes;
this.partitionPaths = partitionPaths;
this.txnManager = new TransactionManager(config, table.getStorage());
}

@Override
Expand All @@ -107,7 +103,6 @@ public Option<HoodieIndexPlan> execute() {
.filter(p -> requestedPartitions.contains(p.getPartitionPath())).collect(Collectors.toList());
final HoodieInstant indexInstant = instantGenerator.getIndexRequestedInstant(instantTime);
try {
this.txnManager.beginTransaction(Option.of(indexInstant), Option.empty());
// get last completed instant
Option<HoodieInstant> indexUptoInstant = table.getActiveTimeline().getContiguousCompletedWriteTimeline().lastInstant();
if (indexUptoInstant.isPresent()) {
Expand All @@ -124,8 +119,6 @@ public Option<HoodieIndexPlan> execute() {
LOG.error("Could not initialize file groups", e);
// abort gracefully
abort(indexInstant);
} finally {
this.txnManager.endTransaction(Option.of(indexInstant));
}

return Option.empty();
Expand Down
Loading