From fe361a007511208fed820764faa2cf7ca03db7e0 Mon Sep 17 00:00:00 2001 From: Luca Canali Date: Thu, 3 Sep 2020 14:09:20 +0200 Subject: [PATCH] Release 0.17 --- README.md | 79 +++++++++++-------- build.sbt | 4 +- docs/TODO_and_issues.md | 9 +-- .../ch/cern/sparkmeasure/influxdbsink.scala | 1 - .../ch/cern/sparkmeasure/UtilsTest.scala | 14 ++-- 5 files changed, 59 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index d1c7214..e6cbbf6 100644 --- a/README.md +++ b/README.md @@ -3,25 +3,31 @@ ![sparkMeasure CI](https://github.com/LucaCanali/sparkMeasure/workflows/sparkMeasure%20CI/badge.svg?branch=master&event=push) [![Maven Central](https://maven-badges.herokuapp.com/maven-central/ch.cern.sparkmeasure/spark-measure_2.11/badge.svg)](https://maven-badges.herokuapp.com/maven-central/ch.cern.sparkmeasure/spark-measure_2.11) -### SparkMeasure is a tool for performance troubleshooting of Apache Spark workloads -SparkMeasure simplifies the collection and analysis of Spark performance metrics. -Use sparkMeasure for troubleshooting **interactive and batch** Spark workloads. -Use it also to collect metrics for long-term retention or as part of a **CI/CD** pipeline. +### SparkMeasure is a tool for performance troubleshooting of Apache Spark jobs +SparkMeasure simplifies the collection and analysis of Spark performance metrics. +Use sparkMeasure for troubleshooting **interactive and batch** Spark workloads. +Use it also to collect metrics for long-term retention or as part of a **CI/CD** pipeline. SparkMeasure is also intended as a working example of how to use Spark Listeners for collecting Spark task metrics data. - * Main author and contact: Luca.Canali@cern.ch + credits to Viktor.Khristenko@cern.ch + thanks to PR contributors - * Compatibility: Spark 2.1.x and higher. - -### Getting started with sparkMeasure, by example - * How to use: deploy [sparkMeasure from Maven Central](https://mvnrepository.com/artifact/ch.cern.sparkmeasure/spark-measure) - - Spark 2.x built with scala 2_11: - - Scala: `bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.11:0.16` - - Python: `bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.11:0.16` - - note: `pip install sparkmeasure` to get the Python wrapper API. - - Spark 3.0.x and 2.4.x built with scala 2_12: - - Scala: `bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.16` - - Python: `bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.16` - - note: `pip install sparkmeasure` to get the Python wrapper API. - - Bleeding edge: build from master using sbt: `sbt +package` and use the jars instead of packages. + * Main author and contact: + * Luca.Canali@cern.ch + credits to Viktor.Khristenko@cern.ch + thanks to PR contributors + * For Spark 2.x and 3.x + * Tested on Spark 2.4 and 3.0 + * Spark 2.3 -> should also be OK + * Spark 2.1 and 2.2 -> use sparkMeasure version 0.16 + +### Getting started with sparkMeasure + * Note: sparkMeasure is available on [Maven Central](https://mvnrepository.com/artifact/ch.cern.sparkmeasure/spark-measure) + * Spark 3.0.x and 2.4.x with scala 2.12: + - Scala: `bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17` + - Python: `bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17` + - note: `pip install sparkmeasure` to get the Python wrapper API. + * Spark 2.x with Scala 2.11: + - Scala: `bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.11:0.17` + - Python: `bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.11:0.17` + - note: `pip install sparkmeasure` to get the Python wrapper API. + * Bleeding edge: build sparkMeasure jar using sbt: `sbt +package` and use `--jars` + with the jar just built instead of using `--packages`. + * Note: find the latest jars already built as artifacts in the [GitHub actions](https://github.com/LucaCanali/sparkMeasure/actions) - [ Scala notebook on Databricks](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2061385495597958/2729765977711377/442806354506758/latest.html) @@ -33,10 +39,10 @@ SparkMeasure is also intended as a working example of how to use Spark Listeners - [ Local Python/Jupyter Notebook](examples/SparkMeasure_Jupyter_Python_getting_started.ipynb) -- CLI: spark-shell and pyspark +- CLI: spark-shell and PySpark ``` # Scala CLI, Spark 3.0 - bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.16 + bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17 val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark) stageMetrics.runAndMeasure(spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()) @@ -57,18 +63,17 @@ Spark Context default degree of parallelism = 8 Aggregated Spark stage metrics: numStages => 3 numTasks => 17 -elapsedTime => 14594 (15 s) -stageDuration => 14498 (14 s) -executorRunTime => 108563 (1.8 min) -executorCpuTime => 106613 (1.8 min) -executorDeserializeTime => 4149 (4 s) -executorDeserializeCpuTime => 1025 (1 s) -resultSerializationTime => 1 (1 ms) -jvmGCTime => 64 (64 ms) +elapsedTime => 13520 (14 s) +stageDuration => 13411 (13 s) +executorRunTime => 100020 (1.7 min) +executorCpuTime => 98899 (1.6 min) +executorDeserializeTime => 4358 (4 s) +executorDeserializeCpuTime => 1887 (2 s) +resultSerializationTime => 2 (2 ms) +jvmGCTime => 56 (56 ms) shuffleFetchWaitTime => 0 (0 ms) -shuffleWriteTime => 15 (15 ms) +shuffleWriteTime => 11 (11 ms) resultSize => 19955 (19.0 KB) -numUpdatedBlockStatuses => 0 diskBytesSpilled => 0 (0 Bytes) memoryBytesSpilled => 0 (0 Bytes) peakExecutionMemory => 0 @@ -76,13 +81,25 @@ recordsRead => 2000 bytesRead => 0 (0 Bytes) recordsWritten => 0 bytesWritten => 0 (0 Bytes) -shuffleTotalBytesRead => 472 (472 Bytes) +shuffleRecordsRead => 8 shuffleTotalBlocksFetched => 8 shuffleLocalBlocksFetched => 8 shuffleRemoteBlocksFetched => 0 +shuffleTotalBytesRead => 472 (472 Bytes) +shuffleLocalBytesRead => 472 (472 Bytes) +shuffleRemoteBytesRead => 0 (0 Bytes) +shuffleRemoteBytesReadToDisk => 0 (0 Bytes) shuffleBytesWritten => 472 (472 Bytes) shuffleRecordsWritten => 8 ``` +- CLI: spark-shell, measure workload metrics aggregating from raw task metrics + ``` + # Scala CLI, Spark 3.0 + bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17 + + val taskMetrics = ch.cern.sparkmeasure.TaskMetrics(spark) + taskMetrics.runAndMeasure(spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()) + ``` ### One tool for different use cases, links to documentation and examples * **Interactive mode**: use sparkMeasure to collect and analyze Spark workload metrics real-time when diff --git a/build.sbt b/build.sbt index 9a595d2..9a9b71e 100644 --- a/build.sbt +++ b/build.sbt @@ -1,13 +1,13 @@ name := "spark-measure" -version := "0.17-SNAPSHOT" +version := "0.17" scalaVersion := "2.12.10" crossScalaVersions := Seq("2.11.12", "2.12.10") licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0")) -isSnapshot := true +isSnapshot := false spName := "spark-measure" sparkVersion := "2.4.6" diff --git a/docs/TODO_and_issues.md b/docs/TODO_and_issues.md index e2d943b..a6caefd 100644 --- a/docs/TODO_and_issues.md +++ b/docs/TODO_and_issues.md @@ -9,23 +9,16 @@ If you plan to contribute to sparkMeasure development, please start by reviewing on the user to validate the output. * TODO: Task metrics values collected by sparkMeasure are only for successfully executed tasks. Note that resources used by failed tasks are not collected in the current version. Can this be improved? - * We can expected more task metrics being added in future versions. Current code is not version aware and does - not offer and easy way to handle additional metrics only for newer versions without breaking backward compatibility. - TODO: implement Spark version awareness and custom list of metrics in sparkMeasure - + Following [SPARK PR 18249](https://github.com/apache/spark/pull/18249/files) add support for the metric remoteBytesReadToDisk Task Metric (this is relevant for Spark 2.3.x and above). * TODO: Flight recorder mode, task metrics, find ways to write metrics out to output files incrementally, rather than using the current approach of buffering everything in memory and writing at the end? The current approach has obvious scalability issues. - * TODO: write more tests to be executed by travis CI + * TODO: write more tests to be executed by GitHub CI actions * TODO: add code/exceptions to handle error conditions that can arise in sparkMeasure code * TODO: add more statistics related to job execution, for example report start/min/max.number of executors the job had, which is useful in the case of yarn with spark dynamic allocation * TODO (maybe): add additional sinks for the collected metrics and aggregations besides prometheus, two possible candidates are Kafka and InfluxDB - * ~~TODO (maybe): remove _updatedBlockStatuses from the list of metrics collected by spakMeasure - This follows [SPARK PR 18162](https://github.com/apache/spark/pull/18162) - TaskMetrics._updatedBlockStatuses is off by default.~~ * TODO (maybe) implement in sparkMeasure the removeSparkListener method, to allow stopping data collection from sparkMeasure. (note this is only possible from Spark versions 2.2 and above) * gatherAccumulables=true for taskMetrics(sparkSession: SparkSession, gatherAccumulables: Boolean) diff --git a/src/main/scala/ch/cern/sparkmeasure/influxdbsink.scala b/src/main/scala/ch/cern/sparkmeasure/influxdbsink.scala index 12ba99e..badff3d 100644 --- a/src/main/scala/ch/cern/sparkmeasure/influxdbsink.scala +++ b/src/main/scala/ch/cern/sparkmeasure/influxdbsink.scala @@ -123,7 +123,6 @@ class InfluxDBSink(conf: SparkConf) extends SparkListener { if (logStageMetrics) { val taskmetrics = stageCompleted.stageInfo.taskMetrics - // TODO: add all the available metrics val point2 = Point.measurement("stage_metrics") .tag("applicationId", appId) .time(completionTime, TimeUnit.MILLISECONDS) diff --git a/src/test/scala/ch/cern/sparkmeasure/UtilsTest.scala b/src/test/scala/ch/cern/sparkmeasure/UtilsTest.scala index 3a13431..21b70a2 100644 --- a/src/test/scala/ch/cern/sparkmeasure/UtilsTest.scala +++ b/src/test/scala/ch/cern/sparkmeasure/UtilsTest.scala @@ -12,12 +12,13 @@ class UtilsTest extends FlatSpec with Matchers { submissionTime = 10, completionTime = 11, stageDuration = 12, numTasks = 13, executorRunTime = 14, executorCpuTime = 15, executorDeserializeTime = 16, executorDeserializeCpuTime = 17, - resultSerializationTime = 18, jvmGCTime = 19, resultSize = 20, numUpdatedBlockStatuses = 21, + resultSerializationTime = 18, jvmGCTime = 19, resultSize = 20, diskBytesSpilled = 30, memoryBytesSpilled = 31, peakExecutionMemory = 32, recordsRead = 33, bytesRead = 34, recordsWritten = 35, bytesWritten = 36, shuffleFetchWaitTime = 40, shuffleTotalBytesRead = 41, shuffleTotalBlocksFetched = 42, - shuffleLocalBlocksFetched = 43, shuffleRemoteBlocksFetched = 44, shuffleWriteTime = 45, - shuffleBytesWritten = 46, shuffleRecordsWritten = 47 + shuffleLocalBlocksFetched = 43, shuffleRemoteBlocksFetched = 44, shuffleLocalBytesRead = 45, + shuffleRemoteBytesRead = 46, shuffleRemoteBytesReadToDisk = 47, shuffleRecordsRead = 48, + shuffleWriteTime = 50, shuffleBytesWritten = 51, shuffleRecordsWritten = 52 ) val taskVals0 = TaskVals(jobId = 1, jobGroup = "test", stageId = 2, index = 3, launchTime = 4, finishTime = 5, @@ -25,12 +26,13 @@ class UtilsTest extends FlatSpec with Matchers { speculative = false, gettingResultTime = 12, successful = true, executorRunTime = 14, executorCpuTime = 15, executorDeserializeTime = 16, executorDeserializeCpuTime = 17, - resultSerializationTime = 18, jvmGCTime = 19, resultSize = 20, numUpdatedBlockStatuses = 21, + resultSerializationTime = 18, jvmGCTime = 19, resultSize = 20, diskBytesSpilled = 30, memoryBytesSpilled = 31, peakExecutionMemory = 32, recordsRead = 33, bytesRead = 34, recordsWritten = 35, bytesWritten = 36, shuffleFetchWaitTime = 40, shuffleTotalBytesRead = 41, shuffleTotalBlocksFetched = 42, - shuffleLocalBlocksFetched = 43, shuffleRemoteBlocksFetched = 44, shuffleWriteTime = 45, - shuffleBytesWritten = 46, shuffleRecordsWritten = 47 + shuffleLocalBlocksFetched = 43, shuffleRemoteBlocksFetched = 44, shuffleLocalBytesRead = 45, + shuffleRemoteBytesRead = 46, shuffleRemoteBytesReadToDisk = 47, shuffleRecordsRead = 48, + shuffleWriteTime = 50, shuffleBytesWritten = 51, shuffleRecordsWritten = 52 ) it should "write and read back StageVal (Java Serialization)" in {