Skip to content

Commit

Permalink
adds the first unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
barend-xebia committed Sep 27, 2024
1 parent 6236640 commit ea9c991
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 2 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ lazy val spot = project
`opentelemetry-api`,
`spark-core` % Provided,
scalaTest % Test,
scalactic % Test
scalactic % Test,
`opentelemetry-sdk-testing` % Test,
`assertj-core` % Test,
),
)

Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ object Dependencies {
val `opentelemetry-api` = "io.opentelemetry" % "opentelemetry-api" % openTelemetryVersion
val `opentelemetry-sdk` = "io.opentelemetry" % "opentelemetry-sdk" % openTelemetryVersion
val `opentelemetry-sdk-autoconfigure` = "io.opentelemetry" % "opentelemetry-sdk-extension-autoconfigure" % openTelemetryVersion
val `opentelemetry-sdk-testing` = "io.opentelemetry" % "opentelemetry-sdk-testing" % openTelemetryVersion
val `opentelemetry-exporter-otlp` = "io.opentelemetry" % "opentelemetry-exporter-otlp" % openTelemetryVersion
val `spark-core` = "org.apache.spark" %% "spark-core" % "3.5.3"
val scalactic = "org.scalactic" %% "scalactic" % "3.2.19"
val scalaTest = "org.scalatest" %% "scalatest" % "3.2.19"
val `assertj-core` = "org.assertj" % "assertj-core" % "3.26.3"
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TelemetrySparkListener(val sparkConf: SparkConf) extends SparkListener wit
.setAttribute(TelemetrySpanAttributes.appName, event.appName)
.setAttribute(TelemetrySpanAttributes.sparkUser, event.sparkUser)
event.appId.foreach(sb.setAttribute(TelemetrySpanAttributes.appId, _))
event.appId.foreach(sb.setAttribute(TelemetrySpanAttributes.appAttemptId, _))
event.appAttemptId.foreach(sb.setAttribute(TelemetrySpanAttributes.appAttemptId, _))
val span = sb.startSpan()
val context = span.storeInContext(Context.root())
applicationSpan = Some((span, context))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.xebia.data.spot

import com.xebia.data.spot.TestingSdkProvider.testingSdk
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter
import io.opentelemetry.sdk.testing.time.TestClock
import io.opentelemetry.sdk.trace.`export`.SimpleSpanProcessor
import io.opentelemetry.sdk.trace.data.{SpanData, StatusData}
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.samplers.Sampler
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{JobSucceeded, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerJobEnd, SparkListenerJobStart}
import org.scalatest.{BeforeAndAfterEach, Suite}
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers

import java.time.Duration
import java.util

class TelemetrySparkListenerTest extends AnyFlatSpecLike with TelemetrySparkListenerTestSupport {
import Matchers._

behavior of "TelemetrySparkListener"

it should "generate the expected Trace for simulated events" in {
tsl.onApplicationStart(SparkListenerApplicationStart("testapp", Some("ta123"), 100L, "User", Some("1"), None, None))
advanceTimeBy(Duration.ofMillis(200))
tsl.onJobStart(SparkListenerJobStart(1, 200L, Seq.empty, null))
advanceTimeBy(Duration.ofMillis(5000))
tsl.onJobEnd(SparkListenerJobEnd(1, 5000, JobSucceeded))
advanceTimeBy(Duration.ofMillis(100))
tsl.onApplicationEnd(SparkListenerApplicationEnd(5200L))

val spans = getFinishedSpanItems
spans should have length(2)

// The spans are listed in order of having been ended.
val appSpan = spans.get(1)
val jobSpan = spans.get(0)

assertThat(appSpan)
.isSampled
.hasEnded
.hasNoParent
.hasName("application-testapp")
.hasAttribute(TelemetrySpanAttributes.appId, "ta123")
.hasAttribute(TelemetrySpanAttributes.appName, "testapp")
.hasAttribute(TelemetrySpanAttributes.appAttemptId, "1")
.hasAttribute(TelemetrySpanAttributes.sparkUser, "User")

assertThat(jobSpan)
.isSampled
.hasEnded
.hasParent(appSpan)
.hasName("job-00001")
.hasStatus(StatusData.ok())
}
}

trait TelemetrySparkListenerTestSupport extends Suite with BeforeAndAfterEach {
lazy val tsl: TelemetrySparkListener = {
val conf: SparkConf = new SparkConf()
conf.set("com.xebia.data.spot.sdkProvider", classOf[TestingSdkProvider].getName)
new TelemetrySparkListener(conf)
}

def advanceTimeBy(duration: Duration): Unit = TestingSdkProvider.clock.advance(duration)

def getFinishedSpanItems: util.List[SpanData] = TestingSdkProvider.getFinishedSpanItems

override protected def afterEach(): Unit = TestingSdkProvider.reset()
}

object TestingSdkProvider {
private[spot] val clock: TestClock = TestClock.create()
private[spot] val spanExporter: InMemorySpanExporter = InMemorySpanExporter.create()
private[spot] val testingSdk: OpenTelemetrySdk = OpenTelemetrySdk.builder().setTracerProvider(
SdkTracerProvider.builder().setClock(clock).setSampler(Sampler.alwaysOn()).addSpanProcessor(SimpleSpanProcessor.builder(spanExporter).build()).build()
).build()

def getFinishedSpanItems: util.List[SpanData] = {
spanExporter.flush()
spanExporter.getFinishedSpanItems
}

def reset(): Unit = spanExporter.reset()
}

class TestingSdkProvider extends OpenTelemetrySdkProvider {
override def get(config: Map[String, String]): OpenTelemetry = testingSdk
}

0 comments on commit ea9c991

Please sign in to comment.