Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 57 additions & 52 deletions core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@
package com.cloudera.spark

import java.lang.management._
import java.math.{RoundingMode, MathContext}
import java.math.{MathContext, RoundingMode}
import java.text.SimpleDateFormat
import java.util.Locale
import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean, AtomicReference}
import java.util.concurrent._

import scala.collection.JavaConverters._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}

import com.quantifind.sumac.FieldArgs

import org.apache.spark.{TaskContext, SparkContext}
import com.typesafe.scalalogging.Logger
import org.apache.spark.executor.ExecutorPlugin
import org.apache.spark.memory.SparkMemoryManagerHandle
import org.apache.spark.{SparkContext, TaskContext}

import scala.collection.JavaConverters._

class MemoryMonitor(val args: MemoryMonitorArgs) {

private[this] val log = Logger(this.getClass)

val nettyMemoryHandle = SparkNettyMemoryHandle.get()
val sparkMemManagerHandle = SparkMemoryManagerHandle.get()
val memoryBean = ManagementFactory.getMemoryMXBean
Expand Down Expand Up @@ -59,8 +62,8 @@ class MemoryMonitor(val args: MemoryMonitorArgs) {
val inShutdown = new AtomicBoolean(false)

def showMetricNames: Unit = {
println(s"${nMetrics} Metrics")
(0 until nMetrics).foreach { idx => println(names(idx))}
log.info(s"${nMetrics} Metrics")
(0 until nMetrics).foreach { idx => log.info(names(idx))}
}

def collectSnapshot: MemorySnapshot = {
Expand All @@ -81,41 +84,39 @@ class MemoryMonitor(val args: MemoryMonitorArgs) {
}

def showSnapshot(mem: MemorySnapshot): Unit = {
println(s"Mem usage at ${MemoryMonitor.dateFormat.format(mem.time)}")
println("===============")
log.info(s"Mem usage at ${MemoryMonitor.dateFormat.format(mem.time)}")
log.info("===============")
// TODO headers for each getter?
(0 until nMetrics).foreach { idx =>
val v = mem.values(idx)
println(names(idx) + "\t:" + MemoryMonitor.bytesToString(v) + "(" + v + ")")
log.info(names(idx) + "\t:" + MemoryMonitor.bytesToString(v) + "(" + v + ")")
}
println()
println()
log.info("")
log.info("")
}

def updateAndMaybeShowPeaks(): Unit = {
val snapshot = collectSnapshot
if (peakMemoryUsage.update(snapshot, peakUpdates, reporting)) {
showUpdates(snapshot.time, peakMemoryUsage, peakUpdates)
}
if (peakMemoryUsage.update(snapshot, peakUpdates, reporting)) showUpdates(snapshot.time, peakMemoryUsage, peakUpdates)
}

def showUpdates(time: Long, peakMemory: MemoryPeaks, updates: PeakUpdate): Unit = {
println(s"Peak Memory updates:${MemoryMonitor.dateFormat.format(time)}")
log.info(s"Peak Memory updates:${MemoryMonitor.dateFormat.format(time)}")
(0 until updates.nUpdates).foreach { updateIdx =>
val metricIdx = updates.updateIdx(updateIdx)
val name = names(metricIdx)
val currentVal = MemoryMonitor.bytesToString(peakMemoryUsage.values(metricIdx))
val rawDelta = updates.delta(updateIdx)
val delta = (if (rawDelta > 0) "+" else "-") + MemoryMonitor.bytesToString(rawDelta)
println(s"$name\t:\t$currentVal ($delta)")
log.info(s"$name\t:\t$currentVal ($delta)")
}
}

def showPeaks(time: Long): Unit = {
println(s"Peak Memory usage so far ${MemoryMonitor.dateFormat.format(time)}")
log.info(s"Peak Memory usage so far ${MemoryMonitor.dateFormat.format(time)}")
// TODO headers for each getter?
(0 until nMetrics).foreach { idx =>
println(names(idx) + "\t:" + MemoryMonitor.bytesToString(peakMemoryUsage.values(idx)) +
log.info(names(idx) + "\t:" + MemoryMonitor.bytesToString(peakMemoryUsage.values(idx)) +
"\t\t\t\t" + MemoryMonitor.dateFormat.format(peakMemoryUsage.peakTimes(idx)))
}
}
Expand All @@ -127,7 +128,7 @@ class MemoryMonitor(val args: MemoryMonitorArgs) {
def showLastThreadDump: Unit = {
val threads = lastThreadDump.get()
if (threads != null) {
println("last thread dump:")
log.info("last thread dump:")
MemoryMonitor.showThreadDump(threads)
}
}
Expand All @@ -136,14 +137,14 @@ class MemoryMonitor(val args: MemoryMonitorArgs) {
Runtime.getRuntime.addShutdownHook(new Thread(){
override def run(): Unit = {
inShutdown.set(true)
println()
println("IN SHUTDOWN")
println("================")
log.info("")
log.info("IN SHUTDOWN")
log.info("================")
val snapshot = collectSnapshot
showSnapshot(snapshot)
peakMemoryUsage.update(snapshot, peakUpdates, reporting)
showPeaks(snapshot.time)
println("Last non-shutdown snapshot:")
log.info("Last non-shutdown snapshot:")
showSnapshot(lastNonShutdownSnapshot.get())

showLastThreadDump
Expand All @@ -154,32 +155,33 @@ class MemoryMonitor(val args: MemoryMonitorArgs) {
def beanInfo(): Unit = {

memMgrBeans.foreach { mgr =>
println(mgr.getName + " is managing " + mgr.getMemoryPoolNames.mkString(","))
log.info(mgr.getName + " is managing " + mgr.getMemoryPoolNames.mkString(","))
}

poolBeans.foreach { pool =>
println(pool.getName())
println("============")
println(pool.getName() + " is managed by " + pool.getMemoryManagerNames.mkString(","))
log.info(pool.getName())
log.info("============")
log.info(pool.getName() + " is managed by " + pool.getMemoryManagerNames.mkString(","))
if (pool.isUsageThresholdSupported)
println("supports usage threshold")
log.info("supports usage threshold")
if (pool.isCollectionUsageThresholdSupported)
println("supports collection usage threshold")
log.info("supports collection usage threshold")
pool.getUsage
println()
println()
log.info("")
log.info("")
}

println("BUFFER POOLS")
log.info("BUFFER POOLS")
bufferPoolsBeans.foreach { bp =>
println(s"${bp.getName}: ${bp.getMemoryUsed} / ${bp.getTotalCapacity}")
log.info(s"${bp.getName}: ${bp.getMemoryUsed} / ${bp.getTotalCapacity}")
}
}
}

object MemoryMonitor {

val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
private[this] val log = Logger(this.getClass)

private var monitor: MemoryMonitor = null
private var shutdownHookInstalled = false
Expand Down Expand Up @@ -222,16 +224,16 @@ object MemoryMonitor {
def listAllMBeans: Unit = {
val server = ManagementFactory.getPlatformMBeanServer
val allBeans = server.queryNames(null, null)
println("ALL BEANS")
println("=============")
allBeans.asScala.map{_.toString}.toArray.sorted.foreach { ob => println(ob) }
println()
println()
log.info("ALL BEANS")
log.info("=============")
allBeans.asScala.map{_.toString}.toArray.sorted.foreach { ob => log.info(ob) }
log.info("")
log.info("")
}

def showLimits: Unit = {
println("sun.misc.VM.maxDirectMemory(): " + sun.misc.VM.maxDirectMemory())
println("Runtime.getRuntime.maxMemory(): " + Runtime.getRuntime.maxMemory())
log.info("sun.misc.VM.maxDirectMemory(): " + sun.misc.VM.maxDirectMemory())
log.info("Runtime.getRuntime.maxMemory(): " + Runtime.getRuntime.maxMemory())
}

def installIfSysProps(): Unit = {
Expand All @@ -240,9 +242,9 @@ object MemoryMonitor {
install(args)
installShutdownHook()
args.freq.foreach { freq =>
println(s"POLLING memory monitor every $freq millis")
log.info(s"POLLING memory monitor every $freq millis")
monitor.showCurrentMemUsage
println("done with initial show")
log.info("done with initial show")
startPolling(args)
}
}
Expand All @@ -252,7 +254,7 @@ object MemoryMonitor {
if (!SparkMemoryManagerHandle.isDynamicAllocation(sc)) {
installOnExecutors(sc)
} else {
println ("********* WARNING ***** not installing on executors because of DA")
log.info ("********* WARNING ***** not installing on executors because of DA")
}
}

Expand All @@ -263,7 +265,7 @@ object MemoryMonitor {
} else {
numTasks
}
println(s"Running $t tasks to install memory monitor on executors")
log.info(s"Running $t tasks to install memory monitor on executors")
sc.parallelize(1 to t, t).foreach { _ =>
Thread.sleep(sleep)
installIfSysProps()
Expand Down Expand Up @@ -317,17 +319,19 @@ object MemoryMonitor {
def showThreadDump(threads: Array[ThreadInfo]): Unit = {
threads.foreach { t =>
if (t == null) {
println("<null thread>")
log.info("<null thread>")
} else {
println(t.getThreadId + " " + t.getThreadName + " " + t.getThreadState)
t.getStackTrace.foreach { elem => println("\t" + elem) }
println()
log.info(t.getThreadId + " " + t.getThreadName + " " + t.getThreadState)
t.getStackTrace.foreach { elem => log.info("\t" + elem) }
log.info("")
}
}
}
}

class MemoryMonitorExecutorExtension extends ExecutorPlugin {
private[this] val log = Logger(this.getClass)

// the "extension class" api just lets you invoke a constructor. We really just want to
// call this static method, so that's good enough.
MemoryMonitor.installIfSysProps()
Expand Down Expand Up @@ -356,7 +360,7 @@ class MemoryMonitorExecutorExtension extends ExecutorPlugin {
val task = scheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = {
val d = MemoryMonitor.dateFormat.format(System.currentTimeMillis())
println(s"Polled thread dump @ $d")
log.info(s"Polled thread dump @ $d")
MemoryMonitor.showThreadDump(MemoryMonitor.getThreadInfo)
}
}, 0, args.threadDumpFreqMillis, TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -397,6 +401,7 @@ class MemoryMonitorArgs extends FieldArgs {
}

object MemoryMonitorArgs {
private[this] val log = Logger(this.getClass)
val prefix = "memory.monitor."
val prefixLen = prefix.length

Expand All @@ -406,7 +411,7 @@ object MemoryMonitorArgs {
k.substring(prefixLen) -> v
})
if (args.stagesToPoll != null && args.stagesToPoll.nonEmpty) {
System.out.println(s"will poll thread dumps for stages ${args.stagesToPoll.mkString(",")}")
log.info(s"will poll thread dumps for stages ${args.stagesToPoll.mkString(",")}")
} else {
args.stagesToPoll = Array()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.apache.spark.memory

import com.cloudera.spark.{Reflector, IncrementBytes, MemoryGetter}
import org.apache.spark.util.{Utils, ThreadStackTrace}
import com.cloudera.spark.{IncrementBytes, MemoryGetter, Reflector}
import org.apache.spark.{SparkContext, SparkEnv}

class SparkMemoryManagerHandle(
Expand Down
Loading