Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<module>proto</module>
<module>tracedsl</module>
<module>spark</module>
<module>testcontainers</module>
<!-- <module>testcontainers</module> -->
</modules>

<properties>
Expand All @@ -29,7 +29,7 @@
<version.scala>2.12</version.scala>
<version.kafka>2.3.0</version.kafka>
<version.prometheus>0.7.0</version.prometheus>
<version.testcontainers>1.13.0</version.testcontainers>
<!-- <version.testcontainers>1.13.0</version.testcontainers> -->
<version.junit>4.11</version.junit>
<version.awaitility>4.0.2</version.awaitility>
<version.maven-shade-plugin>3.1.0</version.maven-shade-plugin>
Expand All @@ -53,11 +53,12 @@
<artifactId>jaeger-spark</artifactId>
<version>${project.version}</version>
</dependency>
<!--
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jaeger-testcontainers</artifactId>
<version>${project.version}</version>
</dependency>
</dependency> -->

<dependency>
<groupId>io.jaegertracing</groupId>
Expand Down
7 changes: 6 additions & 1 deletion spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.14.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${version.scala}</artifactId>
Expand Down Expand Up @@ -79,7 +84,7 @@
</transformer>
</transformers>
<!-- spark + scala + ... = lots and lots of classes! -->
<minimizeJar>true</minimizeJar>
<!-- <minimizeJar>true</minimizeJar> -->
<filters>
<!-- Prevent minification from excluding classes looked up by name -->
<filter>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import java.util.zip.GZIPInputStream;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;

/**
* @author Pavol Loffay
*/
public class SparkKinesisRunner {

private static final Logger logger = Logger.getLogger(SparkKinesisRunner.class.getName());
/**
* System Properties that can be passed
* SPARK_MASTER
Expand All @@ -63,14 +65,27 @@ public class SparkKinesisRunner {
* @throws IOException
*/
public static void main(String[] args) throws InterruptedException, IOException {
PropertyConfigurator.configure("/data/log4j.properties"); // can make it configurable

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have to do like this? Log4j2 will look for config file specified in a pre defined env var by default. We can set that env var to our log4j2.xml or log4j2.yml file when we run this jar.

<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-api</artifactId>
    <version>2.14.1</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.14.1</version>
</dependency>

https://logging.apache.org/log4j/2.x/manual/configuration.html

//@transient lazy logger = LogManager.getRootLogger();
HTTPServer server = new HTTPServer(getPropOrEnv("PROMETHEUS_HOST", "localhost"), Integer.parseInt(getPropOrEnv("PROMETHEUS_PORT", "9111")));

JsonUtil.init(new ObjectMapper());

SparkConf sparkConf = new SparkConf()
.setAppName("Trace DSL")
.setMaster(getPropOrEnv("SPARK_MASTER", "local[*]"))
.set("spark.testing.memory", getPropOrEnv("SPARK_MEMORY", "471859200"));
.set("spark.testing.memory", getPropOrEnv("SPARK_MEMORY", "4073741824"))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move all defaults to constant and use it.

.set("spark.default.parallelism", getPropOrEnv("SPARK_DEFAULT_PARALLELISM", "30"))
.set("spark.executor.cores", getPropOrEnv("SPARK_EXECUTOR_CORES", "1"))
.set("spark.driver.cores", getPropOrEnv("SPARK_DRIVER_CORES", "1"))
.set("spark.executor.instances", getPropOrEnv("SPARK_EXECUTOR_INSTANCES", "15"))
.set("spark.serializer", getPropOrEnv("SPARK_SERIALIZER", "org.apache.spark.serializer.KryoSerializer"));

/* other explored values of settings
.set("spark.cores.max", getPropOrEnv("SPARK_CORES_MAX", "16"))
.set("spark.default.parallelism", getPropOrEnv("SPARK_DEFAULT_PARALLELISM", "16"))
.set("spark.executor.cores", getPropOrEnv("SPARK_EXECUTOR_CORES", "12"))
*/

JavaSparkContext sc = new JavaSparkContext(sparkConf);
long batchInterval = Integer.parseInt(getPropOrEnv("SPARK_STREAMING_BATCH_DURATION", "10000"));
Expand All @@ -85,27 +100,30 @@ public static void main(String[] args) throws InterruptedException, IOException
String region = Regions.getCurrentRegion()!=null ? Regions.getCurrentRegion().getName()
: getPropOrEnv("AWS_REGION", Regions.US_EAST_1.getName());

String service_endpoint = getPropOrEnv("KINESIS_ENDPOINT", "https://kinesis.us-east-1.amazonaws.com");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw error if not provided. We can't try to read from other region and all right.


InitialPositionInStream initialPosition;
try {
initialPosition = InitialPositionInStream
.valueOf(getPropOrEnv("KINESIS_STREAM_POSITION", "TRIM_HORIZON"));
.valueOf(getPropOrEnv("KINESIS_STREAM_POSITION", "LATEST"));
} catch (IllegalArgumentException e) {
initialPosition = InitialPositionInStream.valueOf("TRIM_HORIZON");
initialPosition = InitialPositionInStream.valueOf("LATEST");
}


String applicationName = (getPropOrEnv("SPARK_STREAMING_BATCH_DURATION", "10000"));
KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
.streamingContext(ssc)
.regionName(region)
.streamName(getPropOrEnv("KINESIS_STREAM", "common_haystack_traces"))
.initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(initialPosition))
.checkpointAppName("trace-analytics")
.checkpointAppName(getPropOrEnv("DYNAMO_TABLE", "trace-analytics"))
.checkpointInterval(Duration.apply(60 * 1000))
.build();

JavaDStream<byte[]> dStream = JavaDStream.fromDStream(kinesisStream, ClassTag$.MODULE$.apply(byte[].class));

long windowInterval = Integer.parseInt(getPropOrEnv("SPARK_STREAMING_WINDOW_DURATION", "60000"));
JavaDStream<Span> spanStream = dStream.window(Duration.apply(windowInterval)).flatMap((FlatMapFunction<byte[], Span>) kinesisRecord -> {
// TO DO can put windowing if needed later
//long windowInterval = Integer.parseInt(getPropOrEnv("SPARK_STREAMING_WINDOW_DURATION", "60000"));
JavaDStream<Span> spanStream = dStream.flatMap((FlatMapFunction<byte[], Span>) kinesisRecord -> {
String payload = new String(decompress(kinesisRecord), StandardCharsets.UTF_8);
String[] records = payload.split(System.lineSeparator());
List<Span> spanList = new LinkedList<>();
Expand All @@ -120,8 +138,13 @@ public static void main(String[] args) throws InterruptedException, IOException
span.serviceName = span.process.serviceName;
spanList.add(span);
}
Map<String,String> tags = new HashMap<>();
for (Map.Entry<String, String> tag: span.tag.entrySet()) {
tags.put (tag.getKey().replace("@","."),tag.getValue());
}
span.tag = tags;
} catch (Exception e) {
System.out.println("Exception for record : "+record);
logger.error("Exception for record : "+record);
e.printStackTrace();
}
}
Expand All @@ -130,7 +153,7 @@ public static void main(String[] args) throws InterruptedException, IOException

JavaPairDStream<String, Span> traceIdSpanTuple = spanStream.mapToPair(record -> new Tuple2<>(record.traceID, record));
JavaDStream<Trace> tracesStream = traceIdSpanTuple.groupByKey().map(traceIdSpans -> {
System.out.printf("TraceID: %s\n", traceIdSpans._1);
logger.info("TraceID: "+traceIdSpans._1);
Iterable<Span> spans = traceIdSpans._2();
Trace trace = new Trace();
trace.traceId = traceIdSpans._1();
Expand All @@ -140,12 +163,13 @@ public static void main(String[] args) throws InterruptedException, IOException
});

List<ModelRunner> modelRunner = Arrays.asList(
new TraceHeight(),
new ServiceDepth(),
new ServiceHeight(),
/* Removing problematic metrics*/
//new TraceHeight(),
//new ServiceDepth(),
//new ServiceHeight(),
new NetworkLatency(),
new NumberOfErrors(),
new DirectDependencies(),
//new NumberOfErrors(),
//new DirectDependencies(),
// trace quality
new HasClientServerSpans(),
new UniqueSpanId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
public class GraphCreator {

private GraphCreator() {}

private static final Counter ORPHAN_SPAN_COUNTER = Counter.build()
/* private static final Counter ORPHAN_SPAN_COUNTER = Counter.build()
.name("orphan_span_total")
.help("Number of orphan spans within single trace")
.labelNames("service", "operation")
Expand All @@ -40,7 +39,7 @@ private GraphCreator() {}
.help("Number of spans in trace with root span")
.create()
.register();

*/
public static Graph create(Trace trace) {
TinkerGraph graph = TinkerGraph.open();

Expand All @@ -67,7 +66,7 @@ public static Graph create(Trace trace) {
vertex.property(tag.key.replace("@","."), tag.value);
});
}

/*
boolean hasOrphanSpan = false;
for (Span span: trace.spans) {
Vertex vertex = vertexMap.get(span.spanID);
Expand All @@ -88,7 +87,7 @@ public static Graph create(Trace trace) {
if(hasOrphanSpan) {
ORPHAN_SPAN_TRACE_COUNTER.inc();
}

*/
return graph;
}

Expand Down