Skip to content
This repository was archived by the owner on Jan 14, 2025. It is now read-only.

Commit

Permalink
Merge pull request #5 from atlanhq/error_table
Browse files Browse the repository at this point in the history
Error table fixes
  • Loading branch information
Akash Tandon authored Oct 16, 2019
2 parents badf599 + b423c91 commit e71a0fb
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.uber.marmaray.common.configuration;

import com.google.common.base.Optional;
import com.uber.marmaray.common.sinks.hoodie.HoodieWriteStatus;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand Down Expand Up @@ -44,7 +45,7 @@

/**
* {@link HoodieConfiguration} class holds hoodie configurations.
*
* <p>
* All common properties start with {@link #HOODIE_COMMON_PROPERTY_PREFIX}.
* All table properties start with {@link #HOODIE_TABLES_PREFIX}.
*/
Expand Down Expand Up @@ -116,10 +117,10 @@ public class HoodieConfiguration implements Serializable {
// HOODIE_PARQUET_MAX_FILE_SIZE.
public static final long DEFAULT_HOODIE_TARGET_FILE_SIZE = FileUtils.ONE_GB;
/**
* Write buffer limit in bytes to be used for bulk insert
*/
* Write buffer limit in bytes to be used for bulk insert
*/
public static final String HOODIE_INSERT_BUFFER_MEMORY_BYTES =
HOODIE_COMMON_PROPERTY_PREFIX + "insert_buffer_memory_bytes";
HOODIE_COMMON_PROPERTY_PREFIX + "insert_buffer_memory_bytes";
public static final int DEFAULT_HOODIE_INSERT_BUFFER_MEMORY_BYTES = (int) (32 * FileUtils.ONE_MB);

// Hoodie Compaction parameters
Expand Down Expand Up @@ -198,7 +199,7 @@ public class HoodieConfiguration implements Serializable {
* Hoodie Write status class
*/
public static final String HOODIE_WRITE_STATUS_CLASS = HOODIE_COMMON_PROPERTY_PREFIX + "write_status_class";
public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getCanonicalName();
public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = HoodieWriteStatus.class.getCanonicalName();
// Hoodie metrics config.
/**
* Hoodie metrics prefix
Expand Down Expand Up @@ -233,7 +234,7 @@ public class HoodieConfiguration implements Serializable {
private final Optional<String> version;

public HoodieConfiguration(@NonNull final Configuration conf, @NotEmpty final String tableKey,
@NonNull final Optional<String> version) {
@NonNull final Optional<String> version) {
this.conf = conf;
this.tableKey = tableKey;
this.version = version;
Expand All @@ -255,9 +256,10 @@ public List<String> getMandatoryProperties() {
/**
* @return hoodie base path directory
*/
public String getBasePath() {
public String getTablePath() {
// HOODIE_BASE_PATH is a mandatory property. Please check {#getMandatoryProperties()}.
return this.getConf().getProperty(getTablePropertyKey(HOODIE_BASE_PATH, this.tableKey)).get();
return this.getConf().getProperty(getTablePropertyKey(HOODIE_BASE_PATH, this.tableKey)).get() + "/" +
this.getTableName();
}

/**
Expand Down Expand Up @@ -294,14 +296,14 @@ public HoodieSink.HoodieSinkOp getHoodieSinkOp() {

/**
* @return hoodie metrics prefix.
* */
*/
public String getHoodieMetricsPrefix() {
return this.getConf().getProperty(getTablePropertyKey(HOODIE_METRICS_PREFIX, this.tableKey)).get();
}

public String getHoodieDataPartitioner(@NotEmpty final String defaultDataPartitioner) {
return this.getConf().getProperty(getTablePropertyKey(HOODIE_DATA_PARTITIONER, this.tableKey),
defaultDataPartitioner);
defaultDataPartitioner);
}

/**
Expand Down Expand Up @@ -380,7 +382,7 @@ public HoodieWriteConfig getHoodieWriteConfig() {
// This table name is used for sending metrics to graphite by hoodie. It expects table name to be without
// ".".
builder.forTable(getTableName().replaceAll("\\.", StringTypes.UNDERSCORE));
builder.withPath(getBasePath());
builder.withPath(getTablePath());
final boolean combineBeforeInsert =
getProperty(HOODIE_COMBINE_BEFORE_INSERT, DEFAULT_HOODIE_COMBINE_BEFORE_INSERT);
final boolean combineBeforeUpsert =
Expand Down Expand Up @@ -482,7 +484,7 @@ public HoodieWriteConfig getHoodieWriteConfig() {
* @param <T> DataType of the property
*/
public <T> T getProperty(@NotEmpty final String propertyKey,
@NonNull final T defaultValue) {
@NonNull final T defaultValue) {
final String defaultKey = getDefaultPropertyKey(propertyKey);
final String tableKey = getTablePropertyKey(propertyKey, this.tableKey);
final T retValue = Configuration.getProperty(this.conf, defaultKey, defaultValue);
Expand Down Expand Up @@ -574,13 +576,13 @@ public Builder withMetricsPrefix(@NotEmpty final String metricsPrefix) {

public Builder withCombineBeforeInsert(final boolean combineBeforeInsert) {
this.conf.setProperty(getTablePropertyKey(HOODIE_COMBINE_BEFORE_INSERT, tableKey),
Boolean.toString(combineBeforeInsert));
Boolean.toString(combineBeforeInsert));
return this;
}

public Builder withCombineBeforeUpsert(final boolean combineBeforeUpsert) {
this.conf.setProperty(getTablePropertyKey(HOODIE_COMBINE_BEFORE_UPSERT, tableKey),
Boolean.toString(combineBeforeUpsert));
Boolean.toString(combineBeforeUpsert));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@
* processed by the data converter.
*/
public class DummyHoodieSinkDataConverter extends HoodieSinkDataConverter {
public DummyHoodieSinkDataConverter() {

super(new Configuration(), new ErrorExtractor(), HoodieConfiguration.newBuilder(new Configuration(),
"test").build());
public DummyHoodieSinkDataConverter(@NonNull final HoodieConfiguration hoodieConf) {
super(new Configuration(), new ErrorExtractor(), hoodieConf);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ public HashMap<String, String> getMetadataInfo() {
private static Map<String, String> readMetadataInfo(
@NonNull final HoodieConfiguration hoodieConf, @NonNull final HadoopConfiguration hadoopConf) {
try {
final FileSystem fs = FSUtils.getFs(hoodieConf.getConf(), Optional.of(hoodieConf.getBasePath()));
final FileSystem fs = FSUtils.getFs(hoodieConf.getConf(), Optional.of(hoodieConf.getTablePath()));
HoodieUtil.initHoodieDataset(fs, hadoopConf, hoodieConf);
final HoodieTableMetaClient hoodieTableMetaClient =
new HoodieTableMetaClient(new HadoopConfiguration(hoodieConf.getConf()).getHadoopConf(),
hoodieConf.getBasePath(), true);
hoodieConf.getTablePath(), true);
final HoodieActiveTimeline hoodieActiveTimeline = hoodieTableMetaClient.getActiveTimeline();
final Option<HoodieInstant> lastInstant = hoodieActiveTimeline.getCommitTimeline()
.filterCompletedInstants().lastInstant();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public void write(@NonNull final RDDWrapper<HoodieRecord<HoodieRecordPayload>> h
protected void initDataset() {
try {
HoodieUtil.initHoodieDataset(FSUtils.getFs(this.hoodieConf.getConf(),
Optional.of(this.hoodieConf.getBasePath())), this.hadoopConf, this.hoodieConf);
Optional.of(this.hoodieConf.getTablePath())), this.hadoopConf, this.hoodieConf);
} catch (IOException e) {
log.error("Error initializing hoodie dataset.", e);
throw new JobRuntimeException("Could not initialize hoodie dataset", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@
package com.uber.marmaray.common.sinks.hoodie;

import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;

import java.util.Map;
import java.util.Optional;
import java.util.*;

/**
* Helper class to change default behavior for {@link WriteStatus}
*/
public class HoodieWriteStatus extends WriteStatus {

private long totalRecords;
private long totalErrorRecords = 0;

private final HashMap<HoodieKey, Throwable> errors = new HashMap<>();
private final List<HoodieRecord> failedRecords = new ArrayList<>();

public HoodieWriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
super(trackSuccessRecords, failureFraction);
Expand All @@ -43,8 +47,55 @@ public void markSuccess(final HoodieRecord record, final Option<Map<String, Stri
this.totalRecords++;
}

/**
* Overriding {@link #markFailure(HoodieRecord, Throwable, Option)} to avoid caching. Catch all error records for
* error table
* {@link org.apache.hudi.common.model.HoodieKey} for successfully written hoodie records.
*/
@Override
public void markFailure(HoodieRecord record, Throwable t, Option<Map<String, String>> optionalRecordMetadata) {
failedRecords.add(record);
errors.put(record.getKey(), t);

totalRecords++;
totalErrorRecords++;
}

@Override
public boolean hasErrors() {
return totalErrorRecords > 0;
}

@Override
public long getTotalRecords() {
return super.getTotalRecords() + this.totalRecords;
return this.totalRecords;
}

@Override
public long getTotalErrorRecords() {
return totalErrorRecords;
}

@Override
public List<HoodieRecord> getFailedRecords() {
return failedRecords;
}

@Override
public HashMap<HoodieKey, Throwable> getErrors() {
return errors;
}

@Override
public boolean isErrored(HoodieKey key) {
return errors.containsKey(key);
}

@Override
public String toString() {
return "WriteStatus {" + "hasErrors='" + hasErrors() + '\'' +
", errorCount='" + totalErrorRecords + '\'' +
", errorPct='" + (100.0 * totalErrorRecords) / totalRecords + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
import com.uber.marmaray.common.metadata.HoodieBasedMetadataManager;
import com.uber.marmaray.common.metadata.IMetadataManager;
import com.uber.marmaray.common.metrics.*;
import com.uber.marmaray.common.reporters.ElasticsearchReporter;
import com.uber.marmaray.common.reporters.PromPushGatewayReporter;
import com.uber.marmaray.common.reporters.ConsoleReporter;
import com.uber.marmaray.common.reporters.Reporters;
import com.uber.marmaray.common.schema.kafka.KafkaSchemaJSONServiceReader;
import com.uber.marmaray.common.sinks.hoodie.HoodieSink;
Expand Down Expand Up @@ -78,8 +77,7 @@ private void run(final String[] args) throws IOException {
final Configuration conf = getConfiguration(args);

final Reporters reporters = new Reporters();
// reporters.addReporter(new ConsoleReporter());
reporters.addReporter(new ElasticsearchReporter("localhost", 9200, "marmaray_metrics"));
reporters.addReporter(new ConsoleReporter());

final Map<String, String> metricTags = Collections.emptyMap();
final DataFeedMetrics dataFeedMetrics = new DataFeedMetrics("KafkaToHoodieJob", metricTags);
Expand Down Expand Up @@ -111,7 +109,8 @@ private void run(final String[] args) throws IOException {
final TimerMetric convertSchemaLatencyMs =
new TimerMetric(DataFeedMetricNames.CONVERT_SCHEMA_LATENCY_MS, metricTags);

final Schema outputSchema = new Schema.Parser().parse(hoodieConf.getHoodieWriteConfig().getSchema());
final String schema = "{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"Record\", \"fields\": [{\"name\": \"Region\", \"type\": \"string\"}, {\"name\": \"Country\", \"type\": \"string\"}] }";
final Schema outputSchema = new Schema.Parser().parse(schema);
convertSchemaLatencyMs.stop();
reporters.report(convertSchemaLatencyMs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ public static void writeErrorRecordsToErrorTable(@NonNull final SparkContext sc,
try {
final HoodieBasedMetadataManager metadataManager =
new HoodieBasedMetadataManager(hoodieConf, hadoopConf, shouldSaveChanges, jsc);
final HoodieSink hoodieSink = new HoodieErrorSink(hoodieConf, hadoopConf, new DummyHoodieSinkDataConverter(),
jsc, metadataManager,false);
final HoodieSink hoodieSink = new HoodieErrorSink(hoodieConf, hadoopConf,
new DummyHoodieSinkDataConverter(hoodieConf), jsc, metadataManager,false);

JavaRDD<GenericRecord> errorRecords = errorData.getData().map(error -> generateGenericErrorRecord(
errorExtractor, errorTableSchema, error, applicationId));
Expand Down Expand Up @@ -162,7 +162,7 @@ public static void initErrorTableDataset(@NonNull final Configuration conf, @Not
.enableMetrics(false)
.build();
final HadoopConfiguration hadopConf = new HadoopConfiguration(conf);
HoodieUtil.initHoodieDataset(FSUtils.getFs(conf, Optional.of(hoodieConf.getBasePath())), hadopConf, hoodieConf);
HoodieUtil.initHoodieDataset(FSUtils.getFs(conf, Optional.of(hoodieConf.getTablePath())), hadopConf, hoodieConf);
}

public static void addErrorSchemaConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public String getErrorSourceData(@NonNull final ErrorData errorData) {

@Override
public String getErrorException(@NonNull final ErrorData errorData) {
return errorData.getErrMessage();
String errorMessage = errorData.getErrMessage();
if (errorMessage != null) {
return errorMessage;
}
return "NA";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package com.uber.marmaray.utilities;

import com.google.common.base.Optional;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand Down Expand Up @@ -51,13 +50,13 @@ private HoodieUtil() {
*/
public static void initHoodieDataset(@NonNull final FileSystem fs, @NonNull final HadoopConfiguration hadoopConf,
@NonNull final HoodieConfiguration hoodieConf) throws IOException {
final Path hoodieMetaFolder = new Path(hoodieConf.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME);
final Path hoodieMetaFolder = new Path(hoodieConf.getTablePath(), HoodieTableMetaClient.METAFOLDER_NAME);
final Path hoodiePropertiesFile = new Path(hoodieMetaFolder.toString(),
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
if (!fs.exists(hoodiePropertiesFile)) {
HoodieTableMetaClient
.initDatasetAndGetMetaClient(hadoopConf.getHadoopConf(),
hoodieConf.getBasePath(), hoodieConf.getHoodieInitProperties());
hoodieConf.getTablePath(), hoodieConf.getHoodieInitProperties());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testErrorTableConfiguration() {

HoodieConfiguration hoodieConfiguration = createHoodieConfiguration(conf);

Assert.assertEquals(hoodieConfiguration.getBasePath(), errorBasePath);
Assert.assertEquals(hoodieConfiguration.getTablePath(), errorBasePath);
Assert.assertEquals(hoodieConfiguration.getTableName(), errorTableName);
Assert.assertEquals(hoodieConfiguration.getHoodieMetricsPrefix(), errorMetricsPrefix);
}
Expand Down

0 comments on commit e71a0fb

Please sign in to comment.