Skip to content

Commit

Permalink
implement otel SDK bootstrapping
Browse files Browse the repository at this point in the history
  • Loading branch information
barend-xebia committed Jul 26, 2024
1 parent e395b2f commit b6cd4b4
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 18 deletions.
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,60 @@ Instrumenting for telemetry is useless until you publish the recorded data somew

If you're using Spark on top of Kubernetes, you should install and configure the [OpenTelemetry Operator][ot-k8s-oper]. In any other deployment you should publish the appropriate [environment variables for autoconf][ot-auto-env].

### Customizing OpenTelemetry AutoConfigure

The automatic configuration is controlled by a set of environment variables or JVM system properties. These are documented here: [configuration][otel-config].

```bash
export OTEL_TRACES_EXPORTER=zipkin
export OTEL_EXPORTER_ZIPKIN_ENDPOINT=http://localhost:9411/api/v2/spans
```

Note: if you use the Kubernetes Operator, these variables are controlled there.

### Configuring OpenTelemetry SDK Manually

If the OpenTelemetry Autoconfigure mechanism doesn't meet your requirements, you can provide your own OpenTelemetry instance programmatically. This requires a few steps:

1. Write a class that implements `com.xebia.data.spot.OpenTelemetrySdkProvider`.
```scala
package com.example
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.sdk.OpenTelemetrySdk

class MyCustomProvider extends OpenTelemetrySdkProvider {
override def get(config: Map[String, String]): OpenTelemetry = OpenTelemetrySdk.builder()
// customize SDK construction
.build()
}
```
2. Make the compiled class available to your Spark environment.
3. Add `com.xebia.data.spot.sdkProvider` to your spark config, referencing your implementation.
```diff
SCALA_VERSION=2.12 # This will be 2.12 or 2.13, whichever matches your Spark deployment.
spark-submit \
--jar com.xebia.data.spot.spot-complete_${SCALA_VERSION}-x.y.z.jar \
--conf spark.extraListeners=com.xebia.data.spot.TelemetrySparkListener \
+ --conf com.xebia.data.spot.sdkProvider=com.example.MyCustomProvider \
com.example.MySparkJob
```

## Design Choices

### Crash on initialization failure

If the OpenTelemetry SDK cannot be obtained during startup, we allow the listener –and enclosing spark job– to crash.

**Trade-off:** the enclosing spark job can run just fine without the telemetry listener. If we handle any initialization errors, we get out of the way of the instrumented business process.

**Rationale:** if you instrument the job, you expect to see your telemetry. Fail-fast behaviour ensures no telemetry is silently lost.



[ot-auto]: https://opentelemetry.io/docs/languages/java/instrumentation/#automatic-configuration
[ot-auto-env]: https://opentelemetry.io/docs/languages/java/configuration/
[ot-col]: https://opentelemetry.io/docs/collector/
[otel-config]: https://opentelemetry.io/docs/languages/java/configuration/
[ot-export]: https://opentelemetry.io/ecosystem/registry/?component=exporter
[ot-home]: https://opentelemetry.io/
[ot-k8s-oper]: https://opentelemetry.io/docs/kubernetes/operator/
Expand Down
10 changes: 8 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ lazy val spot = project
name := "spot",
libraryDependencies ++= Seq(
`opentelemetry-api`,
`spark-core` % Provided
`spark-core` % Provided,
scalaTest % Test,
scalactic % Test
),
)

Expand All @@ -27,7 +29,11 @@ lazy val `spot-complete` = project
name := "spot-complete",
libraryDependencies ++= Seq(
`opentelemetry-sdk`,
`opentelemetry-sdk-autoconfigure`
`opentelemetry-sdk-autoconfigure`,
`opentelemetry-exporter-otlp`,
`spark-core` % Provided,
scalaTest % Test,
scalactic % Test
),
assembly / assemblyJarName := s"${name.value}_${scalaBinaryVersion.value}-${version.value}.jar",
assembly / assemblyOption ~= {
Expand Down
8 changes: 5 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import sbt._

object Dependencies {
private[this] val openTelemetryVersion = "1.39.0"
private[this] val openTelemetryAutoConf = "1.38.0"
private[this] val openTelemetryVersion = "1.40.0"

val `opentelemetry-api` = "io.opentelemetry" % "opentelemetry-api" % openTelemetryVersion
val `opentelemetry-sdk` = "io.opentelemetry" % "opentelemetry-sdk" % openTelemetryVersion
val `opentelemetry-sdk-autoconfigure` = "io.opentelemetry" % "opentelemetry-sdk-extension-autoconfigure" % openTelemetryAutoConf
val `opentelemetry-sdk-autoconfigure` = "io.opentelemetry" % "opentelemetry-sdk-extension-autoconfigure" % openTelemetryVersion
val `opentelemetry-exporter-otlp` = "io.opentelemetry" % "opentelemetry-exporter-otlp" % openTelemetryVersion
val `spark-core` = "org.apache.spark" %% "spark-core" % "3.5.1"
val scalactic = "org.scalactic" %% "scalactic" % "3.2.19"
val scalaTest = "org.scalatest" %% "scalatest" % "3.2.19"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.xebia.data.spot.autoconf

import com.xebia.data.spot.OpenTelemetrySdkProvider
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk

class SdkProvider extends OpenTelemetrySdkProvider {

override def get(config: Map[String, String]): OpenTelemetrySdk = {
AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.xebia.data.spot

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should

class AutoconfiguredOpenTelemetrySdkProviderTest extends AnyFlatSpec with should.Matchers with ConsoleTelemetry {

behavior of "OpenTelemetrySupport"

it should "use the AutoConfiguredOpenTelemetrySdk if no config is provided" in {
val uh = new TestOpenTelemetrySupport()
// TODO improve verification;
uh.openTelemetry should not be (null)
}
}

private[this] class TestOpenTelemetrySupport extends OpenTelemetrySupport {
override def spotConfig: Map[String, String] = Map.empty
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.xebia.data.spot

import org.scalactic.source.Position
import org.scalatest.{BeforeAndAfter, Suite}

trait ConsoleTelemetry extends Suite with BeforeAndAfter {

override protected def before(fun: => Any)(implicit pos: Position): Unit = {
sys.props += "otel.logs.exporter" -> "console"
sys.props += "otel.metrics.exporter" -> "console"
sys.props += "otel.traces.exporter" -> "console"
}

override protected def after(fun: => Any)(implicit pos: Position): Unit = {
sys.props -= "otel.logs.exporter"
sys.props -= "otel.metrics.exporter"
sys.props -= "otel.traces.exporter"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.xebia.data.spot

import io.opentelemetry.api.OpenTelemetry

/**
* Enables spot to obtain an OpenTelemetry SDK instance.
*/
trait OpenTelemetrySdkProvider {
/**
* Returns an instance of [[OpenTelemetry]].
*
* @param config all SparkConf values.
* @return an instance of [[OpenTelemetry]].
*/
def get(config: Map[String, String]): OpenTelemetry
}
28 changes: 17 additions & 11 deletions spot/src/main/scala/com/xebia/data/spot/OpenTelemetrySupport.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
package com.xebia.data.spot

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.trace.Tracer
import org.apache.spark.SparkConf

private[spot] trait OpenTelemetrySupport {
def conf: OpenTelemetryConfig = ???
lazy val tracer: Tracer = ???
}
/**
* Grants access to an OpenTelemetry instance.
*
* If no configuration is provided, this attempts to load the spot.autoconf.SdkProvider, which is defined in the "spot-
* complete" subproject. If the configuration contains a value for the key 'com.xebia.data.spot.sdkProvider', it
* attempts to load the class indicated by that value.
*/
trait OpenTelemetrySupport {
def spotConfig: Map[String, String]

private[spot] case class OpenTelemetryConfig() {
val openTelemetry: OpenTelemetry = {
val provFQCN = spotConfig.getOrElse("com.xebia.data.spot.sdkProvider", "com.xebia.data.spot.autoconf.SdkProvider")
val provClass = java.lang.Class.forName(provFQCN)
val provider = provClass.getDeclaredConstructor().newInstance().asInstanceOf[OpenTelemetrySdkProvider]
provider.get(spotConfig)
}

}

private[spot] object OpenTelemetryConfig {
def from(conf: SparkConf): OpenTelemetryConfig = ???
lazy val tracer: Tracer = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.xebia.data.spot
import io.opentelemetry.api.trace.{Span, StatusCode}
import io.opentelemetry.context.Context
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerStageCompleted}
import org.apache.spark.scheduler.{JobSucceeded, SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerStageCompleted}
import org.slf4j.{Logger, LoggerFactory}

import scala.collection.mutable
Expand All @@ -24,7 +24,8 @@ class TelemetrySparkListener(val sparkConf: SparkConf) extends SparkListener wit
@transient
protected lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName)

override val conf = OpenTelemetryConfig.from(sparkConf)
override def spotConfig: Map[String, String] = sparkConf.getAll.toMap

private var applicationSpan: Option[(Span, Context)] = None
private val jobSpans = mutable.Map[Int, (Span, Context)]()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.xebia.data.spot

import io.opentelemetry.api.OpenTelemetry
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should

class OpenTelemetrySupportTest extends AnyFlatSpec with should.Matchers {

behavior of "OpenTelemetrySupport"

it should "reflectively create an SDK provider based on 'spot.sdkProvider' config" in {
val uh = new NoopOpenTelemetrySupport("com.xebia.data.spot.sdkProvider" -> classOf[NoopSdkProvider].getName)
uh.openTelemetry should be theSameInstanceAs OpenTelemetry.noop()
}
}

class NoopOpenTelemetrySupport(config: (String, String)*) extends OpenTelemetrySupport {
override def spotConfig: Map[String, String] = Map(config:_*)
}

class NoopSdkProvider extends OpenTelemetrySdkProvider {
override def get(config: Map[String, String]): OpenTelemetry = OpenTelemetry.noop()
}

0 comments on commit b6cd4b4

Please sign in to comment.