Skip to content
This repository was archived by the owner on Jan 14, 2025. It is now read-only.

Commit

Permalink
Fix HoodieWriteStatus to track all failed records for error table
Browse files Browse the repository at this point in the history
  • Loading branch information
firecast committed Oct 15, 2019
1 parent 7910951 commit 9da39cb
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.uber.marmaray.common.configuration;

import com.google.common.base.Optional;
import com.uber.marmaray.common.sinks.hoodie.HoodieWriteStatus;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand Down Expand Up @@ -198,7 +199,7 @@ public class HoodieConfiguration implements Serializable {
* Hoodie Write status class
*/
public static final String HOODIE_WRITE_STATUS_CLASS = HOODIE_COMMON_PROPERTY_PREFIX + "write_status_class";
public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getCanonicalName();
public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = HoodieWriteStatus.class.getCanonicalName();
// Hoodie metrics config.
/**
* Hoodie metrics prefix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@
package com.uber.marmaray.common.sinks.hoodie;

import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;

import java.util.Map;
import java.util.Optional;
import java.util.*;

/**
* Helper class to change default behavior for {@link WriteStatus}
*/
public class HoodieWriteStatus extends WriteStatus {

private long totalRecords;
private long totalErrorRecords = 0;

private final HashMap<HoodieKey, Throwable> errors = new HashMap<>();
private final List<HoodieRecord> failedRecords = new ArrayList<>();

public HoodieWriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
super(trackSuccessRecords, failureFraction);
Expand All @@ -43,8 +47,55 @@ public void markSuccess(final HoodieRecord record, final Option<Map<String, Stri
this.totalRecords++;
}

/**
* Overriding {@link #markFailure(HoodieRecord, Throwable, Option)} to avoid caching. Catch all error records for
* error table
* {@link org.apache.hudi.common.model.HoodieKey} for successfully written hoodie records.
*/
@Override
public void markFailure(HoodieRecord record, Throwable t, Option<Map<String, String>> optionalRecordMetadata) {
failedRecords.add(record);
errors.put(record.getKey(), t);

totalRecords++;
totalErrorRecords++;
}

@Override
public boolean hasErrors() {
return totalErrorRecords > 0;
}

@Override
public long getTotalRecords() {
return super.getTotalRecords() + this.totalRecords;
return this.totalRecords;
}

@Override
public long getTotalErrorRecords() {
return totalErrorRecords;
}

@Override
public List<HoodieRecord> getFailedRecords() {
return failedRecords;
}

@Override
public HashMap<HoodieKey, Throwable> getErrors() {
return errors;
}

@Override
public boolean isErrored(HoodieKey key) {
return errors.containsKey(key);
}

@Override
public String toString() {
return "WriteStatus {" + "hasErrors='" + hasErrors() + '\'' +
", errorCount='" + totalErrorRecords + '\'' +
", errorPct='" + (100.0 * totalErrorRecords) / totalRecords + '\'' +
'}';
}
}

0 comments on commit 9da39cb

Please sign in to comment.