From 0018797ae90bf350b96a51d6c2b43e79a065b6aa Mon Sep 17 00:00:00 2001 From: fhan Date: Mon, 21 Oct 2024 20:47:19 +0800 Subject: [PATCH 1/5] [HUDI-8400] apply write.ignore.failed when write data failed --- .../apache/hudi/config/HoodieWriteConfig.java | 18 ++++++++++++++++++ .../storage/row/HoodieRowDataCreateHandle.java | 5 +++++ .../hudi/configuration/FlinkOptions.java | 1 + .../apache/hudi/util/FlinkWriteClients.java | 3 ++- 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 09df47b44521c..dd5ee628f54cc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -811,6 +811,12 @@ public class HoodieWriteConfig extends HoodieConfig { */ public static final String WRITES_FILEID_ENCODING = "_hoodie.writes.fileid.encoding"; + public static final ConfigProperty IGNORE_FAILED = ConfigProperty + .key("hoodie.write.ignore.failed") + .defaultValue(false) + .sinceVersion("") + .withDocumentation("Flag to indicate whether to ignore any non exception error (e.g. writestatus error)."); + private ConsistencyGuardConfig consistencyGuardConfig; private FileSystemRetryConfig fileSystemRetryConfig; @@ -2788,6 +2794,13 @@ public int getSecondaryIndexParallelism() { return metadataConfig.getSecondaryIndexParallelism(); } + /** + * Whether to ignore the write failed. + */ + public boolean getIgnoreWriteFailed() { + return getBooleanOrDefault(IGNORE_FAILED); + } + public static class Builder { protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig(); @@ -3330,6 +3343,11 @@ public Builder withWriteRecordPositionsEnabled(boolean shouldWriteRecordPosition return this; } + public Builder withWriteIgnoreFailed(boolean ignoreFailedWriteData) { + writeConfig.setValue(IGNORE_FAILED, String.valueOf(ignoreFailedWriteData)); + return this; + } + protected void setDefaults() { writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType)); // Check for mandatory properties diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java index b08e814d15c5a..dce64fce11b9e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.storage.HoodieStorage; @@ -142,7 +143,11 @@ public void write(String recordKey, String partitionPath, RowData record) throws ? HoodieRecordDelegate.create(recordKey, partitionPath, null, newRecordLocation) : null; writeStatus.markSuccess(recordDelegate, Option.empty()); } catch (Throwable t) { + LOG.error("Failed to write : key is " + recordKey + ", data is " + rowData, t); writeStatus.markFailure(recordKey, partitionPath, t); + if (!writeConfig.getIgnoreWriteFailed()) { + throw new HoodieException(t.getMessage(), t); + } } } catch (Throwable ge) { writeStatus.setGlobalError(ge); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index cc7f5f6783ad8..8d4a3f78b7e0f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -439,6 +439,7 @@ private FlinkOptions() { .key("write.ignore.failed") .booleanType() .defaultValue(false) + .withFallbackKeys("hoodie.write.ignore.failed") .withDescription("Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch. \n" + "By default false. Turning this on, could hide the write status errors while the flink checkpoint moves ahead. \n" + "So, would recommend users to use this with caution."); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java index e9d0310d4756d..f25367392fe6a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java @@ -225,7 +225,8 @@ public static HoodieWriteConfig getHoodieClientConfig( .withAutoCommit(false) .withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) .withProps(flinkConf2TypedProperties(conf)) - .withSchema(getSourceSchema(conf).toString()); + .withSchema(getSourceSchema(conf).toString()) + .withWriteIgnoreFailed(conf.get(FlinkOptions.IGNORE_FAILED)); Option lockConfig = getLockConfig(conf); if (lockConfig.isPresent()) { From ac53f4c1e6a0d047f7a94865c231a1255d34342e Mon Sep 17 00:00:00 2001 From: fhan Date: Tue, 22 Oct 2024 16:49:35 +0800 Subject: [PATCH 2/5] add ignoreWriteFailed() in HoodieWriteHandle and call in sub-class when markFailure --- .../main/java/org/apache/hudi/io/HoodieAppendHandle.java | 1 + .../main/java/org/apache/hudi/io/HoodieCreateHandle.java | 1 + .../main/java/org/apache/hudi/io/HoodieMergeHandle.java | 2 ++ .../main/java/org/apache/hudi/io/HoodieWriteHandle.java | 7 +++++++ 4 files changed, 11 insertions(+) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index cb0c7dd283fab..b70415e06f703 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -499,6 +499,7 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props // for a single record writeStatus.markFailure(record, t, recordMetadata); LOG.error("Error writing record " + record, t); + ignoreWriteFailed(t); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 12406927ae613..93f4716fae7c3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -170,6 +170,7 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props // for a single record writeStatus.markFailure(record, t, recordMetadata); LOG.error("Error writing record " + record, t); + ignoreWriteFailed(t); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 1bf6f6b013894..f1c1473dc64f0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -304,6 +304,7 @@ private boolean writeRecord(HoodieRecord newRecord, Option comb HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: " + newRecord.getPartitionPath() + " but trying to insert into partition: " + partitionPath); writeStatus.markFailure(newRecord, failureEx, recordMetadata); + ignoreWriteFailed(failureEx); return false; } try { @@ -333,6 +334,7 @@ private boolean writeRecord(HoodieRecord newRecord, Option comb } catch (Exception e) { LOG.error("Error writing record " + newRecord, e); writeStatus.markFailure(newRecord, e, recordMetadata); + ignoreWriteFailed(e); } return false; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index a251a5f761442..af1cb2bed8d5f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -52,6 +52,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; @@ -174,6 +175,12 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props // NO_OP } + protected void ignoreWriteFailed(Throwable throwable) { + if (config.getIgnoreWriteFailed()) { + throw new HoodieException(throwable.getMessage(), throwable); + } + } + /** * Perform the actual writing of the given record into the backing file. */ From 42c43554accffa28d2bb11c2d02c81950126e5a7 Mon Sep 17 00:00:00 2001 From: fhan Date: Tue, 22 Oct 2024 16:55:47 +0800 Subject: [PATCH 3/5] fix checkstyle --- .../src/main/java/org/apache/hudi/io/HoodieWriteHandle.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index af1cb2bed8d5f..efb266dd0c434 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -52,7 +52,6 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -import java.util.Map; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; From 592d155ef0032644780b7d0191177f249a07a400 Mon Sep 17 00:00:00 2001 From: fhan Date: Wed, 23 Oct 2024 16:34:32 +0800 Subject: [PATCH 4/5] add logs --- .../src/main/java/org/apache/hudi/io/HoodieWriteHandle.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index efb266dd0c434..ee4c0db6ce886 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -174,6 +174,10 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props // NO_OP } + /** + * ignore failed write based on 'getIgnoreWriteFailed' config + * @param throwable + */ protected void ignoreWriteFailed(Throwable throwable) { if (config.getIgnoreWriteFailed()) { throw new HoodieException(throwable.getMessage(), throwable); From 7a74ac51516058cec4b9eae1ba260926089f454b Mon Sep 17 00:00:00 2001 From: fhan Date: Fri, 25 Oct 2024 17:27:28 +0800 Subject: [PATCH 5/5] bug fix --- .../src/main/java/org/apache/hudi/io/HoodieWriteHandle.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index ee4c0db6ce886..9aad2ecdf2b38 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -179,7 +179,7 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props * @param throwable */ protected void ignoreWriteFailed(Throwable throwable) { - if (config.getIgnoreWriteFailed()) { + if (!config.getIgnoreWriteFailed()) { throw new HoodieException(throwable.getMessage(), throwable); } }