diff --git a/Config b/Config new file mode 100644 index 0000000..8ab9ce1 --- /dev/null +++ b/Config @@ -0,0 +1,37 @@ +# -*-perl-*- + +package.Spark-memory = { + interfaces = (1.0); + + deploy = { + generic = true; + }; + + build-environment = { + chroot = basic; + network-access = blocked; + }; + + # Use NoOpBuild. See https://w.amazon.com/index.php/BrazilBuildSystem/NoOpBuild + build-system = no-op; + build-tools = { + 1.0 = { + NoOpBuild = 1.0; + }; + }; + + # Use runtime-dependencies for when you want to bring in additional + # packages when deploying. + # Use dependencies instead if you intend for these dependencies to + # be exported to other packages that build against you. + dependencies = { + 1.0 = { + }; + }; + + runtime-dependencies = { + 1.0 = { + }; + }; + +}; diff --git a/README.md b/README.md index 776cefe..969a3fd 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,23 @@ Spark Memory Monitor ======================== +Intro +---------- +Memory management is a complicated system in spark that often times leads to confusing and difficult to diagnose problems. Typically the user gets anerror message that shows that YARN has killed an executor for exceeding physical memory limits but gives no insight into why that is. Further more the typical tools used to investigate java memory use are ineffective due to the extensive use of off-heap memory use in spark. This results in a portition of the process memory being “unaccounted for”. spark-memory is a tool to provide insight into the parts of the code that are not typically visible giving insight into the max memory use. -Usage +> ExecutorLostFailure (executor 19 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 5.6 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714. + +Example yarn error message + + +Building spark-memory ----------- Build with `mvn package`, `sbt`, etc. + +Modifying spark-submit to use spark-memory +----------- + Include that jar in your spark application. You could bundle it directly, or just include it with `--jars`. The monitoring is configured via java system properties: diff --git a/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala b/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala index 9a85c11..6ee6d53 100644 --- a/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala +++ b/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala @@ -327,60 +327,7 @@ object MemoryMonitor { } } -class MemoryMonitorExecutorExtension extends ExecutorPlugin { - // the "extension class" api just lets you invoke a constructor. We really just want to - // call this static method, so that's good enough. - MemoryMonitor.installIfSysProps() - val args = MemoryMonitorArgs.sysPropsArgs - - val monitoredTaskCount = new AtomicInteger(0) - - val scheduler = if (args.stagesToPoll != null && args.stagesToPoll.nonEmpty) { - // TODO share polling executors? - new ScheduledThreadPoolExecutor(1, new ThreadFactory { - override def newThread(r: Runnable): Thread = { - val t = new Thread(r, "thread-dump poll thread") - t.setDaemon(true) - t - } - }) - } else { - null - } - val pollingTask = new AtomicReference[ScheduledFuture[_]]() - - override def taskStart(taskContext: TaskContext): Unit = { - if (args.stagesToPoll.contains(taskContext.stageId())) { - if (monitoredTaskCount.getAndIncrement() == 0) { - // TODO schedule thread polling - val task = scheduler.scheduleWithFixedDelay(new Runnable { - override def run(): Unit = { - val d = MemoryMonitor.dateFormat.format(System.currentTimeMillis()) - println(s"Polled thread dump @ $d") - MemoryMonitor.showThreadDump(MemoryMonitor.getThreadInfo) - } - }, 0, args.threadDumpFreqMillis, TimeUnit.MILLISECONDS) - pollingTask.set(task) - } - } - } - - override def onTaskFailure(context: TaskContext, error: Throwable): Unit = { - removeActiveTask(context) - } - - override def onTaskCompletion(context: TaskContext): Unit = { - removeActiveTask(context) - } - private def removeActiveTask(context: TaskContext): Unit = { - if (args.stagesToPoll.contains(context.stageId())) { - if (monitoredTaskCount.decrementAndGet() == 0) { - pollingTask.get().cancel(false) - } - } - } -} class MemoryMonitorArgs extends FieldArgs { var enabled = false diff --git a/core/src/main/scala/com/cloudera/spark/MemoryMonitorExecutorExtension.scala b/core/src/main/scala/com/cloudera/spark/MemoryMonitorExecutorExtension.scala new file mode 100644 index 0000000..69b52ae --- /dev/null +++ b/core/src/main/scala/com/cloudera/spark/MemoryMonitorExecutorExtension.scala @@ -0,0 +1,62 @@ +package com.cloudera.spark + +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} +import java.util.concurrent.{ScheduledFuture, ScheduledThreadPoolExecutor, ThreadFactory, TimeUnit} + +import org.apache.spark.TaskContext +import org.apache.spark.executor.ExecutorPlugin + +class MemoryMonitorExecutorExtension extends ExecutorPlugin with org.apache.spark.ExecutorPlugin { + // the "extension class" api just lets you invoke a constructor. We really just want to + // call this static method, so that's good enough. + MemoryMonitor.installIfSysProps() + val args = MemoryMonitorArgs.sysPropsArgs + + val monitoredTaskCount = new AtomicInteger(0) + + val scheduler = if (args.stagesToPoll != null && args.stagesToPoll.nonEmpty) { + // TODO share polling executors? + new ScheduledThreadPoolExecutor(1, new ThreadFactory { + override def newThread(r: Runnable): Thread = { + val t = new Thread(r, "thread-dump poll thread") + t.setDaemon(true) + t + } + }) + } else { + null + } + val pollingTask = new AtomicReference[ScheduledFuture[_]]() + + override def taskStart(taskContext: TaskContext): Unit = { + if (args.stagesToPoll.contains(taskContext.stageId())) { + if (monitoredTaskCount.getAndIncrement() == 0) { + // TODO schedule thread polling + val task = scheduler.scheduleWithFixedDelay(new Runnable { + override def run(): Unit = { + val d = MemoryMonitor.dateFormat.format(System.currentTimeMillis()) + println(s"Polled thread dump @ $d") + MemoryMonitor.showThreadDump(MemoryMonitor.getThreadInfo) + } + }, 0, args.threadDumpFreqMillis, TimeUnit.MILLISECONDS) + pollingTask.set(task) + } + } + } + + override def onTaskFailure(context: TaskContext, error: Throwable): Unit = { + removeActiveTask(context) + } + + override def onTaskCompletion(context: TaskContext): Unit = { + removeActiveTask(context) + } + + private def removeActiveTask(context: TaskContext): Unit = { + if (args.stagesToPoll.contains(context.stageId())) { + if (monitoredTaskCount.decrementAndGet() == 0) { + pollingTask.get().cancel(false) + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/memory/SparkMemoryManagerHandle.scala b/core/src/main/scala/org/apache/spark/memory/SparkMemoryManagerHandle.scala index 387d7a8..63ef183 100644 --- a/core/src/main/scala/org/apache/spark/memory/SparkMemoryManagerHandle.scala +++ b/core/src/main/scala/org/apache/spark/memory/SparkMemoryManagerHandle.scala @@ -1,7 +1,6 @@ package org.apache.spark.memory import com.cloudera.spark.{Reflector, IncrementBytes, MemoryGetter} -import org.apache.spark.util.{Utils, ThreadStackTrace} import org.apache.spark.{SparkContext, SparkEnv} class SparkMemoryManagerHandle( diff --git a/core/src/test/scala/com/cloudera/spark/MemoryMonitorExecutorExtension.scala b/core/src/test/scala/com/cloudera/spark/MemoryMonitorExecutorExtension.scala new file mode 100644 index 0000000..8580b87 --- /dev/null +++ b/core/src/test/scala/com/cloudera/spark/MemoryMonitorExecutorExtension.scala @@ -0,0 +1,10 @@ +package com.cloudera.spark + +import org.scalatest.FunSuite + +class MemoryMonitorExecutorExtensionSuite extends FunSuite { + test("MemoryMonitorExecutorExtension should extend the correct class of ExecutorPlugin") { + assert(classOf[MemoryMonitorExecutorExtension].getInterfaces.contains(classOf[org.apache.spark.executor.ExecutorPlugin])) + assert(classOf[MemoryMonitorExecutorExtension].getInterfaces.contains(classOf[org.apache.spark.ExecutorPlugin])) + } +} diff --git a/pom.xml b/pom.xml index 019475d..6c1f041 100644 --- a/pom.xml +++ b/pom.xml @@ -36,7 +36,6 @@ sumac_2.11 0.3.0 - org.scala-lang scala-library @@ -46,10 +45,9 @@ org.apache.spark spark-core_2.11 - 2.3.0 + 2.4.3 provided - org.scalatest scalatest_2.11