Skip to content

Commit

Permalink
spark-submit insists all conf is prefixed 'spark.'
Browse files Browse the repository at this point in the history
The _simplest thing that could possibly work™_ is to require the prefix on all
our own configuration keys. This is the approach taken by Apache Iceberg. The
purist in me was thinking to first reference the keys by prefixing
"com.xebia.[...]" and only if that comes up empty, trying again with the
"spark.com.xebia.[...]" prefix. This was rejected for not being the simplest
thing that could possibly work.

A future version could take this chained approach, and also look for JVM System
Properties and environment variables as configuration sources. For now, this
will do just fine.
  • Loading branch information
barend-xebia committed Nov 27, 2024
1 parent e3f2934 commit df950a4
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 13 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ To use context propagation, provide the necessary headers as SparkConf values. T
spark-submit \
--jar com.xebia.data.spot.spot-complete-${SPARK_VERSION}_${SCALA_VERSION}-x.y.z.jar \
--conf spark.extraListeners=com.xebia.data.spot.TelemetrySparkListener \
+ --conf com.xebia.data.spot.traceparent=00-1234abcd5678abcd-1234abcd-01 \
+ --conf spark.com.xebia.data.spot.traceparent=00-1234abcd5678abcd-1234abcd-01 \
com.example.MySparkJob
```

All SparkConf values that start with `com.xebia.data.spot.` are made available to the `ContextPropagator`. If you use a different propagator than the default, you can prefix its required keys accordingly.
All SparkConf values that start with `spark.com.xebia.data.spot.` are made available to the `ContextPropagator`. If you use a different propagator than the default, you can prefix its required keys accordingly.

### Prerequisites

Expand Down Expand Up @@ -85,14 +85,14 @@ If the OpenTelemetry Autoconfigure mechanism doesn't meet your requirements, you
}
```
2. Make the compiled class available to your Spark environment.
3. Add `com.xebia.data.spot.sdkProvider` to your spark config, referencing your implementation.
3. Add `spark.com.xebia.data.spot.sdkProvider` to your spark config, referencing your implementation.
```diff
SPARK_VERSION=3.5
SCALA_VERSION=2.12 # This will be 2.12 or 2.13, whichever matches your Spark deployment.
spark-submit \
--jar com.xebia.data.spot.spot-complete-${SPARK_VERSION}_${SCALA_VERSION}-x.y.z.jar \
--conf spark.extraListeners=com.xebia.data.spot.TelemetrySparkListener \
+ --conf com.xebia.data.spot.sdkProvider=com.example.MyCustomProvider \
+ --conf spark.com.xebia.data.spot.sdkProvider=com.example.MyCustomProvider \
com.example.MySparkJob
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.slf4j.{Logger, LoggerFactory}
* Grants access to an OpenTelemetry instance.
*
* If no configuration is provided, this attempts to load the spot.autoconf.SdkProvider, which is defined in the "spot-
* complete" subproject. If the configuration contains a value for the key 'com.xebia.data.spot.sdkProvider', it
* complete" subproject. If the configuration contains a value for the key 'spark.com.xebia.data.spot.sdkProvider', it
* attempts to load the class indicated by that value.
*/
trait OpenTelemetrySupport {
Expand Down
2 changes: 1 addition & 1 deletion spot/src/main/scala/com/xebia/data/spot/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ package object spot {
val DEFAULT_PROVIDER_FQCN = "com.xebia.data.spot.autoconf.SdkProvider"

/** Common prefix for all our keys in the SparkConf. */
val SPOT_CONFIG_PREFIX = "com.xebia.data.spot."
val SPOT_CONFIG_PREFIX = "spark.com.xebia.data.spot."

/** Returns the given config key as prefixed with [[SPOT_CONFIG_PREFIX]]. */
@inline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ class GetContextFromConfigTest extends AnyFlatSpecLike {

behavior of "GetContextFromConfigTest"

it should "only return keys in the com.xebia.data.spot namespace, with prefix removed" in new ContextFromConfigTest {
it should "only return keys in the spark.com.xebia.data.spot namespace, with prefix removed" in new ContextFromConfigTest {
val keys = getContextFromConfig.keys(spotConfig).asScala
keys should contain only("abc", "xyz")
}

it should "get values by applying the com.xebia.data.spot prefix" in new ContextFromConfigTest {
it should "get values by applying the spark.com.xebia.data.spot prefix" in new ContextFromConfigTest {
getContextFromConfig.get(spotConfig, "abc") should equal ("abc")
getContextFromConfig.get(spotConfig, "xyz") should equal ("xyz")
}
Expand All @@ -27,7 +27,7 @@ private[this] trait ContextFromConfigTest {
"spark.executor.cores" -> "8",
"abc" -> "def",
"xyz" -> "123",
"com.xebia.data.spot.abc" -> "abc",
"com.xebia.data.spot.xyz" -> "xyz"
"spark.com.xebia.data.spot.abc" -> "abc",
"spark.com.xebia.data.spot.xyz" -> "xyz"
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class OpenTelemetrySupportTest extends AnyFlatSpec with should.Matchers {
behavior of "OpenTelemetrySupport"

it should "reflectively create an SDK provider based on 'spot.sdkProvider' config" in {
val uh = new NoopOpenTelemetrySupport("com.xebia.data.spot.sdkProvider" -> classOf[NoopSdkProvider].getName)
val uh = new NoopOpenTelemetrySupport("spark.com.xebia.data.spot.sdkProvider" -> classOf[NoopSdkProvider].getName)
uh.openTelemetry should be theSameInstanceAs OpenTelemetry.noop()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class TelemetrySparkListenerTest extends AnyFlatSpecLike with BeforeAndAfterEach
.hasStatus(StatusData.ok())
}

it should "get traceId from config if provided" in new TestTelemetrySparkListener("com.xebia.data.spot.traceparent" -> "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01") {
it should "get traceId from config if provided" in new TestTelemetrySparkListener("spark.com.xebia.data.spot.traceparent" -> "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01") {
tsl.onApplicationStart(SparkListenerApplicationStart("testapp", Some("ta123"), 100L, "User", Some("1"), None, None))
tsl.onApplicationEnd(SparkListenerApplicationEnd(5200L))
val appSpan = getFinishedSpanItems.get(0)
Expand All @@ -76,7 +76,7 @@ class TelemetrySparkListenerTest extends AnyFlatSpecLike with BeforeAndAfterEach
class TestTelemetrySparkListener(extraConf: (String, String)*) {
lazy val tsl: TelemetrySparkListener = {
val conf: SparkConf = new SparkConf()
conf.set("com.xebia.data.spot.sdkProvider", classOf[TestingSdkProvider].getName)
conf.set("spark.com.xebia.data.spot.sdkProvider", classOf[TestingSdkProvider].getName)
conf.setAll(extraConf)
new TelemetrySparkListener(conf)
}
Expand Down

0 comments on commit df950a4

Please sign in to comment.