From 9da39cb16afd470b77d2b6c2af7059ad7c46cace Mon Sep 17 00:00:00 2001 From: Amit Prabhu Date: Tue, 15 Oct 2019 17:52:01 +0530 Subject: [PATCH] Fix HoodieWriteStatus to track all failed records for error table --- .../configuration/HoodieConfiguration.java | 3 +- .../sinks/hoodie/HoodieWriteStatus.java | 57 ++++++++++++++++++- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java b/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java index 9297ed2..3eeca37 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/configuration/HoodieConfiguration.java @@ -17,6 +17,7 @@ package com.uber.marmaray.common.configuration; import com.google.common.base.Optional; +import com.uber.marmaray.common.sinks.hoodie.HoodieWriteStatus; import org.apache.hudi.WriteStatus; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.table.HoodieTableConfig; @@ -198,7 +199,7 @@ public class HoodieConfiguration implements Serializable { * Hoodie Write status class */ public static final String HOODIE_WRITE_STATUS_CLASS = HOODIE_COMMON_PROPERTY_PREFIX + "write_status_class"; - public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getCanonicalName(); + public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = HoodieWriteStatus.class.getCanonicalName(); // Hoodie metrics config. /** * Hoodie metrics prefix diff --git a/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieWriteStatus.java b/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieWriteStatus.java index 338eec5..d7f8671 100644 --- a/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieWriteStatus.java +++ b/marmaray/src/main/java/com/uber/marmaray/common/sinks/hoodie/HoodieWriteStatus.java @@ -17,11 +17,11 @@ package com.uber.marmaray.common.sinks.hoodie; import org.apache.hudi.WriteStatus; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; -import java.util.Map; -import java.util.Optional; +import java.util.*; /** * Helper class to change default behavior for {@link WriteStatus} @@ -29,6 +29,10 @@ public class HoodieWriteStatus extends WriteStatus { private long totalRecords; + private long totalErrorRecords = 0; + + private final HashMap errors = new HashMap<>(); + private final List failedRecords = new ArrayList<>(); public HoodieWriteStatus(Boolean trackSuccessRecords, Double failureFraction) { super(trackSuccessRecords, failureFraction); @@ -43,8 +47,55 @@ public void markSuccess(final HoodieRecord record, final Option> optionalRecordMetadata) { + failedRecords.add(record); + errors.put(record.getKey(), t); + + totalRecords++; + totalErrorRecords++; + } + + @Override + public boolean hasErrors() { + return totalErrorRecords > 0; + } + @Override public long getTotalRecords() { - return super.getTotalRecords() + this.totalRecords; + return this.totalRecords; + } + + @Override + public long getTotalErrorRecords() { + return totalErrorRecords; + } + + @Override + public List getFailedRecords() { + return failedRecords; + } + + @Override + public HashMap getErrors() { + return errors; + } + + @Override + public boolean isErrored(HoodieKey key) { + return errors.containsKey(key); + } + + @Override + public String toString() { + return "WriteStatus {" + "hasErrors='" + hasErrors() + '\'' + + ", errorCount='" + totalErrorRecords + '\'' + + ", errorPct='" + (100.0 * totalErrorRecords) / totalRecords + '\'' + + '}'; } }