From e3f29343530e205b87d368011aaae55a3d4a6092 Mon Sep 17 00:00:00 2001 From: Barend Garvelink <159024183+barend-xebia@users.noreply.github.com> Date: Fri, 22 Nov 2024 16:26:44 +0100 Subject: [PATCH] make spans current in context --- .../data/spot/TelemetrySparkListener.scala | 42 +++++++++++++------ .../spot/TelemetrySparkListenerTest.scala | 9 ++-- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/spot/src/main/scala/com/xebia/data/spot/TelemetrySparkListener.scala b/spot/src/main/scala/com/xebia/data/spot/TelemetrySparkListener.scala index e3c77d7..9552069 100644 --- a/spot/src/main/scala/com/xebia/data/spot/TelemetrySparkListener.scala +++ b/spot/src/main/scala/com/xebia/data/spot/TelemetrySparkListener.scala @@ -1,7 +1,7 @@ package com.xebia.data.spot import io.opentelemetry.api.trace.{Span, StatusCode} -import io.opentelemetry.context.Context +import io.opentelemetry.context.{Context, Scope} import org.apache.spark.SparkConf import org.apache.spark.scheduler.{JobSucceeded, SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerStageCompleted} import org.slf4j.{Logger, LoggerFactory} @@ -21,58 +21,69 @@ import scala.collection.mutable * @param sparkConf the `SparkConf`. This is provided automatically by the Spark application as it bootstraps. */ class TelemetrySparkListener(val sparkConf: SparkConf) extends SparkListener with OpenTelemetrySupport { + import com.xebia.data.spot.TelemetrySparkListener.{PendingContext, PendingSpan} + @transient - protected lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName) + protected val logger: Logger = LoggerFactory.getLogger(getClass.getName) + logger.info(s"TelemetrySparkListener starting up: ${System.identityHashCode(this)}") override def spotConfig: Map[String, String] = sparkConf.getAll.toMap - private var applicationSpan: Option[(Span, Context)] = None - private val jobSpans = mutable.Map[Int, (Span, Context)]() + private var applicationSpan: Option[PendingSpan] = None + private val jobSpans = mutable.Map[Int, PendingSpan]() - lazy val rootContext: Context = { - openTelemetry.getPropagators.getTextMapPropagator.extract(Context.root(), spotConfig, new GetContextFromConfig()) + lazy val rootContext: PendingContext = { + logger.info(s"Find rootcontext; config is ${spotConfig}") + val rc = openTelemetry.getPropagators.getTextMapPropagator.extract(Context.root(), spotConfig, new GetContextFromConfig()) + val scope = rc.makeCurrent() + (rc, scope) } override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { val sb = tracer.spanBuilder(s"application-${event.appName}") - .setParent(rootContext) + .setParent(rootContext._1) .setAttribute(TelemetrySpanAttributes.appName, event.appName) .setAttribute(TelemetrySpanAttributes.sparkUser, event.sparkUser) event.appId.foreach(sb.setAttribute(TelemetrySpanAttributes.appId, _)) event.appAttemptId.foreach(sb.setAttribute(TelemetrySpanAttributes.appAttemptId, _)) val span = sb.startSpan() - val context = span.storeInContext(rootContext) - applicationSpan = Some((span, context)) + val scope = span.makeCurrent() + val context = span.storeInContext(rootContext._1) + applicationSpan = Some((span, context, scope)) } override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { applicationSpan - .map { case (span, _) => + .map { case (span, _, scope) => span.end() + scope.close() } .orElse { logger.warn("Received onApplicationEnd, but found no tracing Span.") None } + rootContext._2.close() } override def onJobStart(event: SparkListenerJobStart): Unit = { - applicationSpan.foreach { case (_, parentContext) => + applicationSpan.foreach { case (_, parentContext, _) => val span = tracer.spanBuilder("job-%05d".format(event.jobId)) .setParent(parentContext) .startSpan() + val scope = span.makeCurrent() val context = span.storeInContext(parentContext) - jobSpans += event.jobId -> (span, context) + jobSpans += event.jobId -> (span, context, scope) } } override def onJobEnd(event: SparkListenerJobEnd): Unit = { - jobSpans.get(event.jobId).foreach { case (span, _) => + jobSpans.get(event.jobId).foreach { case (span, _, scope) => event.jobResult match { case JobSucceeded => span.setStatus(StatusCode.OK) case _ => span.setStatus(StatusCode.ERROR) } span.end() + scope.close() } } @@ -82,3 +93,8 @@ class TelemetrySparkListener(val sparkConf: SparkConf) extends SparkListener wit // event.stageInfo.taskMetrics.peakExecutionMemory } } + +object TelemetrySparkListener { + type PendingContext = (Context, Scope) + type PendingSpan = (Span, Context, Scope) +} diff --git a/spot/src/test/scala/com/xebia/data/spot/TelemetrySparkListenerTest.scala b/spot/src/test/scala/com/xebia/data/spot/TelemetrySparkListenerTest.scala index d07aaef..1c97657 100644 --- a/spot/src/test/scala/com/xebia/data/spot/TelemetrySparkListenerTest.scala +++ b/spot/src/test/scala/com/xebia/data/spot/TelemetrySparkListenerTest.scala @@ -89,9 +89,12 @@ class TestTelemetrySparkListener(extraConf: (String, String)*) { object TestingSdkProvider { private[spot] val clock: TestClock = TestClock.create() private[spot] val spanExporter: InMemorySpanExporter = InMemorySpanExporter.create() - private[spot] val testingSdk: OpenTelemetrySdk = OpenTelemetrySdk.builder().setTracerProvider( - SdkTracerProvider.builder().setClock(clock).setSampler(Sampler.alwaysOn()).addSpanProcessor(SimpleSpanProcessor.builder(spanExporter).build()).build() - ).setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())).build() + private[spot] val testingSdk: OpenTelemetrySdk = { + sys.props.put("io.opentelemetry.context.enableStrictContext", "true") + OpenTelemetrySdk.builder().setTracerProvider( + SdkTracerProvider.builder().setClock(clock).setSampler(Sampler.alwaysOn()).addSpanProcessor(SimpleSpanProcessor.builder(spanExporter).build()).build() + ).setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())).build() + } def getFinishedSpanItems: util.List[SpanData] = { spanExporter.flush()