Skip to content
This repository was archived by the owner on Jan 14, 2025. It is now read-only.

Commit

Permalink
Update from latest Uber internal (uber#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
esmioley authored and dannyhchen committed Apr 8, 2019
1 parent afef8a8 commit 8e3ec73
Show file tree
Hide file tree
Showing 174 changed files with 7,401 additions and 1,651 deletions.
1 change: 1 addition & 0 deletions marmaray-tools/checkstyles
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.uber.marmaray.tools;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.uber.marmaray.common.configuration.Configuration;
Expand Down Expand Up @@ -63,7 +64,7 @@ public static void main(final String[] args) throws ParseException, IOException
log.info("Printing contents of metadata file: " + metadataFilePath);

final Configuration conf = new Configuration();
final FileSystem fs = FSUtils.getFs(conf);
final FileSystem fs = FSUtils.getFs(conf, Optional.absent());
try (final InputStream is = new BufferedInputStream(fs.open(new Path(metadataFilePath)))) {
try (final ObjectInputStream input = new ObjectInputStream(is)) {
final Map<String, StringValue> metadataMap = HDFSMetadataManager.deserialize(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import com.uber.marmaray.common.metadata.HDFSMetadataManager;
import com.uber.marmaray.utilities.CommandLineUtil;
import com.uber.marmaray.utilities.FSUtils;

import com.google.common.base.Optional;

import java.io.IOException;
import java.util.Comparator;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -78,7 +81,7 @@ public static void main(final String[] args) throws ParseException, IOException
: false;

final Configuration conf = new Configuration();
final FileSystem fs = FSUtils.getFs(conf);
final FileSystem fs = FSUtils.getFs(conf, Optional.absent());

if (fs.isDirectory(metadataPath)) {
final FileStatus[] fileStatuses = fs.listStatus(metadataPath);
Expand Down
1 change: 1 addition & 0 deletions marmaray/checkstyles
129 changes: 101 additions & 28 deletions marmaray/src/main/java/com/uber/marmaray/common/AvroPayload.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
import com.google.common.base.Preconditions;
import com.uber.marmaray.common.data.IData;
import com.uber.marmaray.utilities.SparkUtil;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.hibernate.validator.constraints.NotEmpty;
import scala.reflect.ClassManifestFactory;
import scala.reflect.ClassTag;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

Expand All @@ -43,45 +44,117 @@
@Slf4j
public class AvroPayload implements IPayload<GenericRecord>, IData, Serializable {

private static final ClassTag<GenericRecord> recordClassTag = ClassManifestFactory.fromClass(GenericRecord.class);
@NonNull
private final Map<String, Object> rootFields = new HashMap<>();
@NonNull
private final byte[] byteRecord;
private IAvroPayloadInternal payloadInterval;

public AvroPayload(@NonNull final GenericRecord record) {
this.byteRecord = SparkUtil.serialize(record, recordClassTag);
for (final Schema.Field f : record.getSchema().getFields()) {
if (!RECORD.equals(f.schema().getType())) {
this.rootFields.put(f.name(), record.get(f.name()));
}
}
this(record, true);
}

public AvroPayload(@NonNull final GenericRecord record,
@NonNull final List<String> fieldsToCache) {
this.byteRecord = SparkUtil.serialize(record, recordClassTag);
for (final String f : fieldsToCache) {
this.rootFields.put(f, record.get(f));
}
this.payloadInterval = new SerializedAvroPayloadInternal(record, fieldsToCache);
}

/**
* Avoid calling it to fetch top level record fields.
*/
public AvroPayload(@NonNull final GenericRecord record, final boolean serializeRecord) {
this.payloadInterval =
serializeRecord ? new SerializedAvroPayloadInternal(record) : new AvroPayloadInternal(record);
}

@Override
public GenericRecord getData() {
return SparkUtil.deserialize(this.byteRecord, recordClassTag);
return this.payloadInterval.getData();
}

public Object getField(@NotEmpty final String fieldName) {
return this.payloadInterval.getField(fieldName);
}

public static List<Class> getSerializationClasses() {
return Arrays.asList(AvroPayload.class,
IAvroPayloadInternal.class,
AvroPayloadInternal.class,
SerializedAvroPayloadInternal.class);
}

private interface IAvroPayloadInternal {

/**
* returns cached {@link GenericRecord} data.
*/
GenericRecord getData();

/**
* Returns field stored at root level.
*/
Object getField(@NotEmpty final String fieldName);
}

@AllArgsConstructor
private static class AvroPayloadInternal implements IAvroPayloadInternal {

@NonNull
private final GenericRecord record;

@Override
public GenericRecord getData() {
return this.record;
}

@Override
public Object getField(@NotEmpty final String fieldName) {
return this.record.get(fieldName);
}
}

/**
* It only supports fetching fields at the root level of the record which are of type other than
* {@link org.apache.avro.generic.GenericData.Record}.
*
* @param fieldName name of the field at the root level of the record.
* It internally stores AvroPayload as byte[] to reduce memory footprint.
*/
public Object getField(@NotEmpty final String fieldName) {
Preconditions.checkState(this.rootFields.containsKey(fieldName),
"field is not cached at root level :" + fieldName);
return this.rootFields.get(fieldName);
private static class SerializedAvroPayloadInternal implements IAvroPayloadInternal {

private final Map<String, Object> rootFields;
private final byte[] byteRecord;

public SerializedAvroPayloadInternal(@NonNull final GenericRecord record) {
this(record, getFieldsToCache(record));
}

public SerializedAvroPayloadInternal(@NonNull final GenericRecord record,
@NonNull final List<String> fieldsToCache) {
this.byteRecord = SparkUtil.serialize(record, SparkUtil.GENERIC_RECORD_CLASS_TAG);
this.rootFields = new HashMap<>();
for (final String f : fieldsToCache) {
this.rootFields.put(f, record.get(f));
}
}

/**
* Avoid calling it to fetch top level record fields.
*/
public GenericRecord getData() {
return SparkUtil.deserialize(this.byteRecord, SparkUtil.GENERIC_RECORD_CLASS_TAG);
}

/**
* It only supports fetching fields at the root level of the record which are of type other than
* {@link org.apache.avro.generic.GenericData.Record}.
*
* @param fieldName name of the field at the root level of the record.
*/
public Object getField(@NotEmpty final String fieldName) {
Preconditions.checkState(this.rootFields.containsKey(fieldName),
"field is not cached at root level :" + fieldName);
return this.rootFields.get(fieldName);
}

private static List<String> getFieldsToCache(@NonNull final GenericRecord record) {
final List<String> fieldsToCache = new LinkedList<>();
for (final Schema.Field f : record.getSchema().getFields()) {
if (!RECORD.equals(f.schema().getType())) {
fieldsToCache.add(f.name());
}
}
return fieldsToCache;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* Copyright (c) 2018 Uber Technologies, Inc.
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions
* of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

package com.uber.marmaray.common;

/**
* {@link DispersalLengthType} defines if dispersed data is from one single day or not
* 1. SINGLE_DAY
* 2. MULTIPLE_DAY
*/
public enum DispersalLengthType {
SINGLE_DAY,
MULTIPLE_DAY
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc.
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions
* of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
package com.uber.marmaray.common;

/**
* {@link MetadataManagerType} defines metadata manager type
* normal: on disk
* Cassandra: cassandra based
*/
public enum MetadataManagerType {
HDFS,
CASSANDRA,
MULTI
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,29 @@ public final class JobDagActions {
* Actions are executed in parallel; execution status will not affect the others.
*/

public static final String DEFAULT_NAME = "anonymous";
public static final String DEFAULT_TARGET = "anonymous";

/**
* The target for the JogDagActions.
* It is equal to datafeed/topic in the context of ingestion jobDag.
* The target could be something else, it's 'JobManager' for job manager actions.
*/
private static final String ACTION_TARGET = "action_target";

@Getter
private final Queue<IJobDagAction> actions;
private final Reporters reporters;

@Getter
private final String name;
private final String target;

public JobDagActions(@NonNull final Reporters reporters) {
this(reporters, DEFAULT_NAME);
this(reporters, DEFAULT_TARGET);
}

public JobDagActions(@NonNull final Reporters reporters, @NotEmpty final String name) {
public JobDagActions(@NonNull final Reporters reporters, @NotEmpty final String target) {
this.actions = new ConcurrentLinkedDeque<>();
this.name = name;
this.target = target;
this.reporters = reporters;
}

Expand Down Expand Up @@ -115,7 +122,7 @@ public boolean execute(final boolean dagSuccess) {
try {
actionSuccess.set(future.get());
} catch (Exception e) {
log.error("Error running JobDagAction {} for {}:", action.getClass(), this.getName(), e);
log.error("Error running JobDagAction {} for {}:", action.getClass(), this.getTarget(), e);
actionSuccess.set(false);
successful.set(false);
}
Expand All @@ -130,12 +137,14 @@ public boolean execute(final boolean dagSuccess) {
private void reportExecuteTime(@NonNull final IJobDagAction action, final long timeInMillis) {
final LongMetric timeMetric = new LongMetric(TIME_METRIC, TimeUnit.MILLISECONDS.toSeconds(timeInMillis));
timeMetric.addTags(action.getMetricTags());
timeMetric.addTag(ACTION_TARGET, this.getTarget());
this.reporters.getReporters().stream().forEach(r -> r.gauge(timeMetric));
}

private void reportActionStatus(@NonNull final IJobDagAction action, final boolean isSuccess) {
final LongMetric resultMetric = new LongMetric(RESULT_METRIC, isSuccess ? RESULT_SUCCESS : RESULT_FAILURE);
resultMetric.addTags(action.getMetricTags());
resultMetric.addTag(ACTION_TARGET, this.getTarget());
this.reporters.getReporters().stream().forEach(r -> r.gauge(resultMetric));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public boolean execute(final boolean success) {
this.dataFeedMetrics.gaugeAll(reporter);
});
} else {
log.warn("No metrics produced or actions being executed on reporter because errors were encountered");
this.reporters.getReporters().forEach(reporter -> this.dataFeedMetrics.gauageFailureMetric(reporter));
log.warn("Other than failure reports "
+ "no metrics produced or actions being executed on reporter because errors were encountered");
}
return success;
}
Expand Down
Loading

0 comments on commit 8e3ec73

Please sign in to comment.