Skip to content

Commit

Permalink
skeleton impl, leaving all the hard parts
Browse files Browse the repository at this point in the history
  • Loading branch information
barend-xebia committed Jun 28, 2024
1 parent bd95595 commit 0fe1e28
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea/
.bsp/
.settings/
target/
.DS_Store
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

```bash
spark-submit \
--conf spark.extraListeners=com.xebia.data.spartel.OpentelSparkListener \
--conf spark.extraListeners=com.xebia.data.spot.TelemetrySparkListener \
...
```
17 changes: 17 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
ThisBuild / organization := "com.xebia.data"
ThisBuild / scalaVersion := "2.13.13"
ThisBuild / crossScalaVersions := Seq("2.12.18", "2.13.13")

lazy val spot = project
.in(file("./spot"))
.settings(
name := "spot",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.5.1",

"io.opentelemetry" % "opentelemetry-api" % "1.37.0",
"io.opentelemetry" % "opentelemetry-sdk" % "1.37.0" % Runtime,

"io.opentelemetry" % "opentelemetry-sdk-extension-autoconfigure" % "1.34.0" % Optional,
)
)
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.9.9
18 changes: 18 additions & 0 deletions spot/src/main/scala/com/xebia/data/spot/OpenTelemetrySupport.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.xebia.data.spot

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Tracer
import org.apache.spark.SparkConf

private[spot] trait OpenTelemetrySupport {
def conf: OpenTelemetryConfig = ???
lazy val tracer: Tracer = ???
}

private[spot] case class OpenTelemetryConfig() {

}

private[spot] object OpenTelemetryConfig {
def from(conf: SparkConf): OpenTelemetryConfig = ???
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.xebia.data.spot

import io.opentelemetry.api.common.AttributeKey

object TelemetrySpanAttributes {
val appAttemptId: AttributeKey[String] = AttributeKey.stringKey("spark.appAttemptId")
val appId: AttributeKey[String] = AttributeKey.stringKey("spark.appId")
val appName: AttributeKey[String] = AttributeKey.stringKey("spark.appName")
val sparkUser: AttributeKey[String] = AttributeKey.stringKey("spark.user")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.xebia.data.spot

import io.opentelemetry.api.trace.{Span, StatusCode}
import io.opentelemetry.context.Context
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerStageCompleted}
import org.slf4j.{Logger, LoggerFactory}

import scala.collection.mutable

/**
* A SparkListener that publishes job telemetry to OpenTelemetry.
*
* Usage:
* {{{
* spark-submit \
* --conf=spark.extraListeners=com.xebia.data.spot.TelemetrySparkListener \
* com.example.MySparkJob
* }}}
*
* @param conf the `SparkConf`. This is provided automatically by the Spark application as it bootstraps.
*/
class TelemetrySparkListener(val sparkConf: SparkConf) extends SparkListener with OpenTelemetrySupport {
@transient
protected lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName)

override val conf = OpenTelemetryConfig.from(sparkConf)
private var applicationSpan: Option[(Span, Context)] = None
private val jobSpans = mutable.Map[Int, (Span, Context)]()

override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
val sb = tracer.spanBuilder(s"application-${event.appName}")
.setAttribute(TelemetrySpanAttributes.appName, event.appName)
.setAttribute(TelemetrySpanAttributes.sparkUser, event.sparkUser)
event.appId.foreach(sb.setAttribute(TelemetrySpanAttributes.appId, _))
event.appId.foreach(sb.setAttribute(TelemetrySpanAttributes.appAttemptId, _))
val span = sb.startSpan()
val context = span.storeInContext(Context.root())
applicationSpan = Some((span, context))
}

override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
applicationSpan
.map { case (span, _) =>
span.end()
}
.orElse {
logger.warn("Received onApplicationEnd, but found no tracing Span.")
None
}
}

override def onJobStart(event: SparkListenerJobStart): Unit = {
applicationSpan.foreach { case (_, parentContext) =>
val span = tracer.spanBuilder("job-%05d".format(event.jobId))
.setParent(parentContext)
.startSpan()
val context = span.storeInContext(parentContext)
jobSpans += event.jobId -> (span, context)
}
}

override def onJobEnd(event: SparkListenerJobEnd): Unit = {
jobSpans.get(event.jobId).foreach { case (span, _) =>
event.jobResult match {
case JobSucceeded => span.setStatus(StatusCode.OK)
case _ => span.setStatus(StatusCode.ERROR)
}
span.end()
}
}

override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
// event.stageInfo.rddInfos: memSize interessant
// event.stageInfo.rddInfos.foreach(_.memSize)
// event.stageInfo.taskMetrics.peakExecutionMemory
}
}
8 changes: 8 additions & 0 deletions spot/src/main/scala/com/xebia/data/spot/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.xebia.data

/**
* Spot: Spark-OpenTelemetry integration.
*/
package object spot {

}

0 comments on commit 0fe1e28

Please sign in to comment.