Skip to content

Commit 053ef94

Browse files
committed
Merge ShufflePerfTester patch into shuffle block consolidation
2 parents 4aa0ba1 + a51359c commit 053ef94

File tree

19 files changed

+551
-490
lines changed

19 files changed

+551
-490
lines changed

core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,29 @@
1717

1818
package org.apache.hadoop.mapred
1919

20+
private[apache]
2021
trait SparkHadoopMapRedUtil {
2122
def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
22-
val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", "org.apache.hadoop.mapred.JobContext");
23-
val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[org.apache.hadoop.mapreduce.JobID])
23+
val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl",
24+
"org.apache.hadoop.mapred.JobContext")
25+
val ctor = klass.getDeclaredConstructor(classOf[JobConf],
26+
classOf[org.apache.hadoop.mapreduce.JobID])
2427
ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
2528
}
2629

2730
def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = {
28-
val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", "org.apache.hadoop.mapred.TaskAttemptContext")
31+
val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl",
32+
"org.apache.hadoop.mapred.TaskAttemptContext")
2933
val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
3034
ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
3135
}
3236

33-
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
37+
def newTaskAttemptID(
38+
jtIdentifier: String,
39+
jobId: Int,
40+
isMap: Boolean,
41+
taskId: Int,
42+
attemptId: Int) = {
3443
new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
3544
}
3645

core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717

1818
package org.apache.hadoop.mapreduce
1919

20-
import org.apache.hadoop.conf.Configuration
2120
import java.lang.{Integer => JInteger, Boolean => JBoolean}
21+
import org.apache.hadoop.conf.Configuration
2222

23+
private[apache]
2324
trait SparkHadoopMapReduceUtil {
2425
def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
2526
val klass = firstAvailableClass(
@@ -37,23 +38,31 @@ trait SparkHadoopMapReduceUtil {
3738
ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
3839
}
3940

40-
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
41-
val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID");
41+
def newTaskAttemptID(
42+
jtIdentifier: String,
43+
jobId: Int,
44+
isMap: Boolean,
45+
taskId: Int,
46+
attemptId: Int) = {
47+
val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID")
4248
try {
43-
// first, attempt to use the old-style constructor that takes a boolean isMap (not available in YARN)
49+
// First, attempt to use the old-style constructor that takes a boolean isMap
50+
// (not available in YARN)
4451
val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean],
45-
classOf[Int], classOf[Int])
46-
ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId), new
47-
JInteger(attemptId)).asInstanceOf[TaskAttemptID]
52+
classOf[Int], classOf[Int])
53+
ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId),
54+
new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
4855
} catch {
4956
case exc: NoSuchMethodException => {
50-
// failed, look for the new ctor that takes a TaskType (not available in 1.x)
51-
val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType").asInstanceOf[Class[Enum[_]]]
52-
val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(taskTypeClass, if(isMap) "MAP" else "REDUCE")
57+
// If that failed, look for the new constructor that takes a TaskType (not available in 1.x)
58+
val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType")
59+
.asInstanceOf[Class[Enum[_]]]
60+
val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(
61+
taskTypeClass, if(isMap) "MAP" else "REDUCE")
5362
val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass,
5463
classOf[Int], classOf[Int])
55-
ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId), new
56-
JInteger(attemptId)).asInstanceOf[TaskAttemptID]
64+
ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId),
65+
new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
5766
}
5867
}
5968
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -51,25 +51,20 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
5151

5252
import org.apache.mesos.MesosNativeLibrary
5353

54-
import org.apache.spark.broadcast.Broadcast
5554
import org.apache.spark.deploy.LocalSparkCluster
5655
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
5756
import org.apache.spark.rdd._
5857
import org.apache.spark.scheduler._
59-
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend,
60-
ClusterScheduler}
61-
import org.apache.spark.scheduler.local.LocalScheduler
58+
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
59+
SparkDeploySchedulerBackend, ClusterScheduler}
6260
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
63-
import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
61+
import org.apache.spark.scheduler.local.LocalScheduler
62+
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
6463
import org.apache.spark.ui.SparkUI
65-
import org.apache.spark.util._
66-
import org.apache.spark.scheduler.StageInfo
67-
import org.apache.spark.storage.RDDInfo
68-
import org.apache.spark.storage.StorageStatus
69-
import scala.Some
70-
import org.apache.spark.scheduler.StageInfo
71-
import org.apache.spark.storage.RDDInfo
72-
import org.apache.spark.storage.StorageStatus
64+
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType,
65+
TimeStampedHashMap, Utils}
66+
67+
7368

7469
/**
7570
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -125,7 +120,7 @@ class SparkContext(
125120
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
126121
private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup)
127122

128-
// Initalize the Spark UI
123+
// Initialize the Spark UI
129124
private[spark] val ui = new SparkUI(this)
130125
ui.bind()
131126

@@ -161,8 +156,8 @@ class SparkContext(
161156
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
162157
// Regular expression for connecting to Spark deploy clusters
163158
val SPARK_REGEX = """spark://(.*)""".r
164-
//Regular expression for connection to Mesos cluster
165-
val MESOS_REGEX = """(mesos://.*)""".r
159+
// Regular expression for connection to Mesos cluster
160+
val MESOS_REGEX = """mesos://(.*)""".r
166161

167162
master match {
168163
case "local" =>
@@ -292,11 +287,31 @@ class SparkContext(
292287
setJobGroup("", value)
293288
}
294289

290+
/**
291+
* Assigns a group id to all the jobs started by this thread until the group id is set to a
292+
* different value or cleared.
293+
*
294+
* Often, a unit of execution in an application consists of multiple Spark actions or jobs.
295+
* Application programmers can use this method to group all those jobs together and give a
296+
* group description. Once set, the Spark web UI will associate such jobs with this group.
297+
*
298+
* The application can also use [[org.apache.spark.SparkContext.cancelJobGroup]] to cancel all
299+
* running jobs in this group. For example,
300+
* {{{
301+
* // In the main thread:
302+
* sc.setJobGroup("some_job_to_cancel", "some job description")
303+
* sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
304+
*
305+
* // In a separate thread:
306+
* sc.cancelJobGroup("some_job_to_cancel")
307+
* }}}
308+
*/
295309
def setJobGroup(groupId: String, description: String) {
296310
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
297311
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
298312
}
299313

314+
/** Clear the job group id and its description. */
300315
def clearJobGroup() {
301316
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null)
302317
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null)
@@ -305,8 +320,8 @@ class SparkContext(
305320
// Post init
306321
taskScheduler.postStartHook()
307322

308-
val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
309-
val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
323+
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
324+
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
310325

311326
def initDriverMetrics() {
312327
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
@@ -881,13 +896,15 @@ class SparkContext(
881896
new SimpleFutureAction(waiter, resultFunc)
882897
}
883898

899+
/**
900+
* Cancel active jobs for the specified group. See [[org.apache.spark.SparkContext.setJobGroup]]
901+
* for more information.
902+
*/
884903
def cancelJobGroup(groupId: String) {
885904
dagScheduler.cancelJobGroup(groupId)
886905
}
887906

888-
/**
889-
* Cancel all jobs that have been scheduled or are running.
890-
*/
907+
/** Cancel all jobs that have been scheduled or are running. */
891908
def cancelAllJobs() {
892909
dagScheduler.cancelAllJobs()
893910
}
@@ -949,9 +966,9 @@ class SparkContext(
949966
*/
950967
object SparkContext {
951968

952-
val SPARK_JOB_DESCRIPTION = "spark.job.description"
969+
private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
953970

954-
val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
971+
private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
955972

956973
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
957974
def addInPlace(t1: Double, t2: Double): Double = t1 + t2

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717

1818
package org.apache.hadoop.mapred
1919

20-
import org.apache.hadoop.fs.FileSystem
21-
import org.apache.hadoop.fs.Path
22-
20+
import java.io.IOException
2321
import java.text.SimpleDateFormat
2422
import java.text.NumberFormat
25-
import java.io.IOException
2623
import java.util.Date
2724

25+
import org.apache.hadoop.fs.FileSystem
26+
import org.apache.hadoop.fs.Path
27+
2828
import org.apache.spark.Logging
2929
import org.apache.spark.SerializableWritable
3030

@@ -36,6 +36,7 @@ import org.apache.spark.SerializableWritable
3636
* Saves the RDD using a JobConf, which should contain an output key class, an output value class,
3737
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
3838
*/
39+
private[apache]
3940
class SparkHadoopWriter(@transient jobConf: JobConf)
4041
extends Logging
4142
with SparkHadoopMapRedUtil
@@ -86,13 +87,11 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
8687
}
8788

8889
getOutputCommitter().setupTask(getTaskContext())
89-
writer = getOutputFormat().getRecordWriter(
90-
fs, conf.value, outputName, Reporter.NULL)
90+
writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL)
9191
}
9292

9393
def write(key: AnyRef, value: AnyRef) {
94-
if (writer!=null) {
95-
//println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")")
94+
if (writer != null) {
9695
writer.write(key, value)
9796
} else {
9897
throw new IOException("Writer is null, open() has not been called")
@@ -182,6 +181,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
182181
}
183182
}
184183

184+
private[apache]
185185
object SparkHadoopWriter {
186186
def createJobID(time: Date, id: Int): JobID = {
187187
val formatter = new SimpleDateFormat("yyyyMMddHHmm")

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ private class BytesToString extends org.apache.spark.api.java.function.Function[
308308
* Internal class that acts as an `AccumulatorParam` for Python accumulators. Inside, it
309309
* collects a list of pickled strings that we pass to Python through a socket.
310310
*/
311-
class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
311+
private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
312312
extends AccumulatorParam[JList[Array[Byte]]] {
313313

314314
Utils.checkHost(serverHost, "Expected hostname")

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,31 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import com.google.common.collect.MapMaker
21-
2220
import org.apache.hadoop.conf.Configuration
2321
import org.apache.hadoop.mapred.JobConf
2422

23+
import com.google.common.collect.MapMaker
24+
2525

2626
/**
27-
* Contains util methods to interact with Hadoop from spark.
27+
* Contains util methods to interact with Hadoop from Spark.
2828
*/
29+
private[spark]
2930
class SparkHadoopUtil {
3031
// A general, soft-reference map for metadata needed during HadoopRDD split computation
3132
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
3233
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
3334

34-
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop
35-
// subsystems
35+
/**
36+
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
37+
* subsystems.
38+
*/
3639
def newConfiguration(): Configuration = new Configuration()
3740

38-
// Add any user credentials to the job conf which are necessary for running on a secure Hadoop
39-
// cluster
41+
/**
42+
* Add any user credentials to the job conf which are necessary for running on a secure Hadoop
43+
* cluster.
44+
*/
4045
def addCredentials(conf: JobConf) {}
4146

4247
def isYarnMode(): Boolean = { false }

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -74,30 +74,33 @@ private[spark] class Executor(
7474
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
7575
Thread.currentThread.setContextClassLoader(replClassLoader)
7676

77-
// Make any thread terminations due to uncaught exceptions kill the entire
78-
// executor process to avoid surprising stalls.
79-
Thread.setDefaultUncaughtExceptionHandler(
80-
new Thread.UncaughtExceptionHandler {
81-
override def uncaughtException(thread: Thread, exception: Throwable) {
82-
try {
83-
logError("Uncaught exception in thread " + thread, exception)
84-
85-
// We may have been called from a shutdown hook. If so, we must not call System.exit().
86-
// (If we do, we will deadlock.)
87-
if (!Utils.inShutdown()) {
88-
if (exception.isInstanceOf[OutOfMemoryError]) {
89-
System.exit(ExecutorExitCode.OOM)
90-
} else {
91-
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
77+
if (!isLocal) {
78+
// Setup an uncaught exception handler for non-local mode.
79+
// Make any thread terminations due to uncaught exceptions kill the entire
80+
// executor process to avoid surprising stalls.
81+
Thread.setDefaultUncaughtExceptionHandler(
82+
new Thread.UncaughtExceptionHandler {
83+
override def uncaughtException(thread: Thread, exception: Throwable) {
84+
try {
85+
logError("Uncaught exception in thread " + thread, exception)
86+
87+
// We may have been called from a shutdown hook. If so, we must not call System.exit().
88+
// (If we do, we will deadlock.)
89+
if (!Utils.inShutdown()) {
90+
if (exception.isInstanceOf[OutOfMemoryError]) {
91+
System.exit(ExecutorExitCode.OOM)
92+
} else {
93+
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
94+
}
9295
}
96+
} catch {
97+
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
98+
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
9399
}
94-
} catch {
95-
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
96-
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
97100
}
98101
}
99-
}
100-
)
102+
)
103+
}
101104

102105
val executorSource = new ExecutorSource(this, executorId)
103106

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,9 @@ class ShuffleWriteMetrics extends Serializable {
102102
* Number of bytes written for a shuffle
103103
*/
104104
var shuffleBytesWritten: Long = _
105+
106+
/**
107+
* Time spent blocking on writes to disk or buffer cache, in nanoseconds.
108+
*/
109+
var shuffleWriteTime: Long = _
105110
}

core/src/main/scala/org/apache/spark/package.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
* limitations under the License.
1616
*/
1717

18+
package org.apache
19+
1820
/**
1921
* Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to
2022
* Spark, while [[org.apache.spark.rdd.RDD]] is the data type representing a distributed collection,

0 commit comments

Comments
 (0)