From d29d4b40fe13fee51aa6da0f7a14c5fee321438b Mon Sep 17 00:00:00 2001 From: Josiah Samuel Date: Wed, 8 Mar 2017 01:36:12 -0500 Subject: [PATCH 1/6] include additional metrics into generated code --- .../catalyst/UserStatsReportListener.scala | 74 ++++++++++ .../spark/sql/catalyst/UserTaskMetrics.scala | 137 ++++++++++++++++++ .../codegen/GenerateUnsafeProjection.scala | 6 + .../sql/execution/WholeStageCodegenExec.scala | 6 +- 4 files changed, 222 insertions(+), 1 deletion(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserStatsReportListener.scala create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserStatsReportListener.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserStatsReportListener.scala new file mode 100644 index 0000000000000..2bbd92269e51c --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserStatsReportListener.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst + +import scala.collection.mutable +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ + +class UserStatsReportListener extends SparkListener with Logging { + + private val taskInfoMetrics = mutable.Buffer[(TaskInfo, TaskMetrics)]() + + private val taskInfoUserMetrics = mutable.Buffer[(TaskInfo, Seq[AccumulableInfo])]() + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + val info = taskEnd.taskInfo + val metrics = taskEnd.taskMetrics + if (info != null && metrics != null) { + taskInfoMetrics += ((info, metrics)) + } + } + + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { + implicit val sc = stageCompleted + this.logInfo(s"Finished stage: ${getStatusDetail(stageCompleted.stageInfo)}") + + val accumulatorUpdates = mutable.Buffer[(Long, Any)]() + + taskInfoMetrics.map{ case(_, metrics) => + metrics.externalAccums.map(a => + a.toInfo(Some(a.value), None)).filter(_.update.isDefined) + .map(accum => { + accumulatorUpdates += + ((accum.id, accum.name.getOrElse("") + ":" + accum.update.get.toString)) + }) + } + + accumulatorUpdates.groupBy(_._1).map { case (accumulatorId, values) => + accumulatorId -> + UserTaskMetrics.stringValue(values.map(_._2)) + }.foreach(x => this.logInfo(x._1 + " : " + x._2)) + + taskInfoMetrics.clear() + } + + private def getStatusDetail(info: StageInfo): String = { + val failureReason = info.failureReason.map("(" + _ + ")").getOrElse("") + val timeTaken = info.submissionTime.map( + x => info.completionTime.getOrElse(System.currentTimeMillis()) - x + ).getOrElse("-") + + s"Stage(${info.stageId}, ${info.attemptId}); Name: '${info.name}'; " + + s"Status: ${info.getStatusString}$failureReason; numTasks: ${info.numTasks}; " + + s"Took: $timeTaken msec" + } + +} + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala new file mode 100644 index 0000000000000..8e3a1f55624f2 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala @@ -0,0 +1,137 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.catalyst + +import java.text.NumberFormat + +import org.apache.spark.scheduler.AccumulableInfo +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.util.{AccumulatorContext, AccumulatorV2} +import org.apache.spark.{SparkContext, TaskContext} +import scala.collection.mutable.ArrayBuffer + +class UserTaskMetric(initValue: Long = 0L) + extends AccumulatorV2[Long, Long] { + private[this] var _value = initValue + private var _zeroValue = initValue + + override def copy(): UserTaskMetric = { + val newAcc = new UserTaskMetric(_value) + newAcc._zeroValue = initValue + newAcc + } + + override def reset(): Unit = _value = _zeroValue + + override def merge(other: AccumulatorV2[Long, Long]): Unit = other match { + case o: UserTaskMetric => _value += o.value + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + override def isZero(): Boolean = _value == _zeroValue + + override def add(v: Long): Unit = _value += v + + def +=(v: Long): Unit = _value += v + + override def value: Long = _value + + def showMetricName: String = metadata.name match { + case Some(name) => name + case None => "" + } + + // Provide special identifier as metadata so we can tell that this is a `UserTaskMetric` later + private[spark] override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { + new AccumulableInfo( + id, name, update, value, true, true, Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) + } +} + + +private[spark] object UserTaskMetrics { + def registerWithTaskContext(acc: UserTaskMetric): Unit = { + if (acc.isAtDriverSide) { + val taskContext = TaskContext.get() + if (taskContext != null) { + acc.metadata.name match { + // scalastyle:off println + case Some(x) => println("User Register Accumulator : " + x) + // scalastyle:on + case None => println("User Register Accumulator : None ") + } + taskContext.registerAccumulator(acc) + } + } else { + acc.metadata.name match { + // scalastyle:off println + case Some(x) => println("User Driver Register Accumulator : " + x) + // scalastyle:on + case None => println("User Driver Register Accumulator : None ") + } + } + } + + private val sc = SparkContext.getOrCreate() + + def createMetric(sc: SparkContext, name: String): UserTaskMetric = { + val acc = new UserTaskMetric() + acc.register(sc, name = Some(name), countFailedValues = false) + // registerWithTaskContext(acc) + acc + } + + def metricTerm(ctx: CodegenContext, name: String, desc: String): String = { + val acc = createMetric(sc, desc) + UserTaskMetrics.registerWithTaskContext(acc) + ctx.addReferenceObj(name, acc ) + } + + /** + * A function that defines how we aggregate the final accumulator results among all tasks, + * and represent it in string for a SQL physical operator. + */ + def stringValue(valuesInput: Seq[Any]): String = { + var valStr = "" + val values = valuesInput.map(valuesTmp => { + val vtmp = valuesInput.asInstanceOf[ArrayBuffer[String]].mkString("") + if (vtmp.contains(":")) { + valStr = vtmp.split(":")(0) + vtmp.split(":")(1).toLong + } else { + vtmp.toLong + } + }) + + val numberFormat = NumberFormat.getInstance() + + val validValues = values.filter(_ >= 0) + val Seq(sum, min, med, max) = { + val metric = if (validValues.isEmpty) { + Seq.fill(4)(0L) + } else { + val sorted = validValues.sorted + Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1)) + } + metric.map(numberFormat.format) + } + s"$valStr: $sum ($min, $med, $max)" + } +} + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index 7e4c9089a2cb9..b4b59d1812081 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types._ +import org.apache.spark.sql.catalyst.UserTaskMetrics /** * Generates a [[Projection]] that returns an [[UnsafeRow]]. @@ -363,6 +364,11 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro expressions: Seq[Expression], subexpressionEliminationEnabled: Boolean): UnsafeProjection = { val ctx = newCodeGenContext() + UserTaskMetrics.metricTerm(ctx, "userDefined1", "User Defined Sum Metrics 1") + UserTaskMetrics.metricTerm(ctx, "userDefined2", "User Defined Sum Metrics 2") + UserTaskMetrics.metricTerm(ctx, "userDefined3", "User Defined Sum Metrics 3") + UserTaskMetrics.metricTerm(ctx, "userDefined4", "User Defined Sum Metrics 4") + val eval = createCode(ctx, expressions, subexpressionEliminationEnabled) val codeBody = s""" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 516b9d5444d31..277fcfe06ebe7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.{broadcast, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, UserTaskMetrics} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning @@ -310,6 +310,10 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co */ def doCodeGen(): (CodegenContext, CodeAndComment) = { val ctx = new CodegenContext + UserTaskMetrics.metricTerm(ctx, "userDefined1", "WholeStage User Defined Sum Metrics 1") + UserTaskMetrics.metricTerm(ctx, "userDefined2", "WholeStage User Defined Sum Metrics 2") + UserTaskMetrics.metricTerm(ctx, "userDefined3", "WholeStage User Defined Sum Metrics 3") + UserTaskMetrics.metricTerm(ctx, "userDefined4", "WholeStage User Defined Sum Metrics 4") val code = child.asInstanceOf[CodegenSupport].produce(ctx, this) val source = s""" public Object generate(Object[] references) { From c8004acbd3a4bc91c8793e974b6c5fc3632eb4cc Mon Sep 17 00:00:00 2001 From: Josiah Samuel Date: Wed, 8 Mar 2017 03:18:24 -0500 Subject: [PATCH 2/6] included user metrics in multiple codegen code --- .../spark/sql/catalyst/UserTaskMetrics.scala | 13 ------------- .../codegen/GenerateMutableProjection.scala | 6 ++++++ .../expressions/codegen/GenerateOrdering.scala | 6 ++++++ .../expressions/codegen/GeneratePredicate.scala | 6 ++++++ .../codegen/GenerateSafeProjection.scala | 6 ++++++ .../codegen/GenerateUnsafeProjection.scala | 8 ++++---- .../codegen/GenerateUnsafeRowJoiner.scala | 17 ++++++++++++++++- .../columnar/GenerateColumnAccessor.scala | 6 ++++++ 8 files changed, 50 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala index 8e3a1f55624f2..ffedfcbae229b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala @@ -70,21 +70,8 @@ private[spark] object UserTaskMetrics { if (acc.isAtDriverSide) { val taskContext = TaskContext.get() if (taskContext != null) { - acc.metadata.name match { - // scalastyle:off println - case Some(x) => println("User Register Accumulator : " + x) - // scalastyle:on - case None => println("User Register Accumulator : None ") - } taskContext.registerAccumulator(acc) } - } else { - acc.metadata.name match { - // scalastyle:off println - case Some(x) => println("User Driver Register Accumulator : " + x) - // scalastyle:on - case None => println("User Driver Register Accumulator : None ") - } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala index 4d732445544a8..3e7728abcc7f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen +import org.apache.spark.sql.catalyst.UserTaskMetrics import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp @@ -52,6 +53,11 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP expressions: Seq[Expression], useSubexprElimination: Boolean): MutableProjection = { val ctx = newCodeGenContext() + UserTaskMetrics.metricTerm(ctx, "userDefined1", "MutableProj User Defined Sum Metrics 1") + UserTaskMetrics.metricTerm(ctx, "userDefined2", "MutableProj User Defined Sum Metrics 2") + UserTaskMetrics.metricTerm(ctx, "userDefined3", "MutableProj User Defined Sum Metrics 3") + UserTaskMetrics.metricTerm(ctx, "userDefined4", "MutableProj User Defined Sum Metrics 4") + val (validExpr, index) = expressions.zipWithIndex.filter { case (NoOp, _) => false case _ => true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala index 1cef95654a17b..e0130ee6ab9e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala @@ -22,6 +22,7 @@ import java.io.ObjectInputStream import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.esotericsoftware.kryo.io.{Input, Output} +import org.apache.spark.sql.catalyst.UserTaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -123,6 +124,11 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR protected def create(ordering: Seq[SortOrder]): BaseOrdering = { val ctx = newCodeGenContext() + UserTaskMetrics.metricTerm(ctx, "userDefined1", "Ordering User Defined Sum Metrics 1") + UserTaskMetrics.metricTerm(ctx, "userDefined2", "Ordering User Defined Sum Metrics 2") + UserTaskMetrics.metricTerm(ctx, "userDefined3", "Ordering User Defined Sum Metrics 3") + UserTaskMetrics.metricTerm(ctx, "userDefined4", "Ordering User Defined Sum Metrics 4") + val comparisons = genComparisons(ctx, ordering) val codeBody = s""" public SpecificOrdering generate(Object[] references) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala index dcd1ed96a298e..96999c1da068e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen +import org.apache.spark.sql.catalyst.UserTaskMetrics import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -46,6 +47,11 @@ object GeneratePredicate extends CodeGenerator[Expression, Predicate] { protected def create(predicate: Expression): Predicate = { val ctx = newCodeGenContext() + UserTaskMetrics.metricTerm(ctx, "userDefined1", "Predicate User Defined Sum Metrics 1") + UserTaskMetrics.metricTerm(ctx, "userDefined2", "Predicate User Defined Sum Metrics 2") + UserTaskMetrics.metricTerm(ctx, "userDefined3", "Predicate User Defined Sum Metrics 3") + UserTaskMetrics.metricTerm(ctx, "userDefined4", "Predicate User Defined Sum Metrics 4") + val eval = predicate.genCode(ctx) val codeBody = s""" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala index b1cb6edefb852..118bee007c535 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen import scala.annotation.tailrec +import org.apache.spark.sql.catalyst.UserTaskMetrics import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} @@ -139,6 +140,11 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] protected def create(expressions: Seq[Expression]): Projection = { val ctx = newCodeGenContext() + UserTaskMetrics.metricTerm(ctx, "userDefined1", "SafeProj User Defined Sum Metrics 1") + UserTaskMetrics.metricTerm(ctx, "userDefined2", "SafeProj User Defined Sum Metrics 2") + UserTaskMetrics.metricTerm(ctx, "userDefined3", "SafeProj User Defined Sum Metrics 3") + UserTaskMetrics.metricTerm(ctx, "userDefined4", "SafeProj User Defined Sum Metrics 4") + val expressionCodes = expressions.zipWithIndex.map { case (NoOp, _) => "" case (e, i) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index b4b59d1812081..04a0274da5a3b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -364,10 +364,10 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro expressions: Seq[Expression], subexpressionEliminationEnabled: Boolean): UnsafeProjection = { val ctx = newCodeGenContext() - UserTaskMetrics.metricTerm(ctx, "userDefined1", "User Defined Sum Metrics 1") - UserTaskMetrics.metricTerm(ctx, "userDefined2", "User Defined Sum Metrics 2") - UserTaskMetrics.metricTerm(ctx, "userDefined3", "User Defined Sum Metrics 3") - UserTaskMetrics.metricTerm(ctx, "userDefined4", "User Defined Sum Metrics 4") + UserTaskMetrics.metricTerm(ctx, "userDefined1", "UnsafeProj User Defined Sum Metrics 1") + UserTaskMetrics.metricTerm(ctx, "userDefined2", "UnsafeProj User Defined Sum Metrics 2") + UserTaskMetrics.metricTerm(ctx, "userDefined3", "UnsafeProj User Defined Sum Metrics 3") + UserTaskMetrics.metricTerm(ctx, "userDefined4", "UnsafeProj User Defined Sum Metrics 4") val eval = createCode(ctx, expressions, subexpressionEliminationEnabled) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala index 4aa5ec82471ec..f7903dbe908f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen +import org.apache.spark.sql.catalyst.UserTaskMetrics import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.Platform @@ -156,16 +157,30 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U } }.mkString("\n") + val ctx = newCodeGenContext() + UserTaskMetrics.metricTerm(ctx, "userDefined1", "UnsafeRowJoiner User Defined Sum Metrics 1") + UserTaskMetrics.metricTerm(ctx, "userDefined2", "UnsafeRowJoiner User Defined Sum Metrics 2") + UserTaskMetrics.metricTerm(ctx, "userDefined3", "UnsafeRowJoiner User Defined Sum Metrics 3") + UserTaskMetrics.metricTerm(ctx, "userDefined4", "UnsafeRowJoiner User Defined Sum Metrics 4") + // ------------------------ Finally, put everything together --------------------------- // val codeBody = s""" |public java.lang.Object generate(Object[] references) { - | return new SpecificUnsafeRowJoiner(); + | return new SpecificUnsafeRowJoiner(references); |} | |class SpecificUnsafeRowJoiner extends ${classOf[UnsafeRowJoiner].getName} { + | private Object[] references; + | ${ctx.declareMutableStates()} + | | private byte[] buf = new byte[64]; | private UnsafeRow out = new UnsafeRow(${schema1.size + schema2.size}); | + | public SpecificUnsafeRowJoiner(Object[] references) { + | this.references = references; + | ${ctx.initMutableStates()} + | } + | | public UnsafeRow join(UnsafeRow row1, UnsafeRow row2) { | // row1: ${schema1.size} fields, $bitset1Words words in bitset | // row2: ${schema2.size}, $bitset2Words words in bitset diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index 14024d6c10558..6c36cf79c6566 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.columnar +import org.apache.spark.sql.catalyst.UserTaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -69,6 +70,11 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera protected def create(columnTypes: Seq[DataType]): ColumnarIterator = { val ctx = newCodeGenContext() + UserTaskMetrics.metricTerm(ctx, "userDefined1", "ColAccessor User Defined Sum Metrics 1") + UserTaskMetrics.metricTerm(ctx, "userDefined2", "ColAccessor User Defined Sum Metrics 2") + UserTaskMetrics.metricTerm(ctx, "userDefined3", "ColAccessor User Defined Sum Metrics 3") + UserTaskMetrics.metricTerm(ctx, "userDefined4", "ColAccessor User Defined Sum Metrics 4") + val numFields = columnTypes.size val (initializeAccessors, extractors) = columnTypes.zipWithIndex.map { case (dt, index) => val accessorName = ctx.freshName("accessor") From 91bb5999f379ddbfda9c7bb0f4de9b8006ed68c4 Mon Sep 17 00:00:00 2001 From: Josiah Samuel Date: Wed, 8 Mar 2017 07:07:23 -0500 Subject: [PATCH 3/6] minor bugs fixed --- .../sql/catalyst/UserStatsReportListener.scala | 2 +- .../spark/sql/catalyst/UserTaskMetrics.scala | 18 ++++++++++-------- .../codegen/GenerateUnsafeRowJoiner.scala | 2 +- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserStatsReportListener.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserStatsReportListener.scala index 2bbd92269e51c..ca7b7750fce28 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserStatsReportListener.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserStatsReportListener.scala @@ -54,7 +54,7 @@ class UserStatsReportListener extends SparkListener with Logging { accumulatorUpdates.groupBy(_._1).map { case (accumulatorId, values) => accumulatorId -> UserTaskMetrics.stringValue(values.map(_._2)) - }.foreach(x => this.logInfo(x._1 + " : " + x._2)) + }.foreach(x => this.logError(x._1 + " : " + x._2)) taskInfoMetrics.clear() } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala index ffedfcbae229b..5f1603af81d0b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala @@ -96,14 +96,16 @@ private[spark] object UserTaskMetrics { */ def stringValue(valuesInput: Seq[Any]): String = { var valStr = "" - val values = valuesInput.map(valuesTmp => { - val vtmp = valuesInput.asInstanceOf[ArrayBuffer[String]].mkString("") - if (vtmp.contains(":")) { - valStr = vtmp.split(":")(0) - vtmp.split(":")(1).toLong - } else { - vtmp.toLong - } + + val values = valuesInput.flatMap(valuesTmp => { + valuesInput.asInstanceOf[ArrayBuffer[String]].map(vtmp => { + if (vtmp.contains(":")) { + valStr = vtmp.split(":")(0) + vtmp.split(":")(1).toLong + } else { + vtmp.toLong + } + }) }) val numberFormat = NumberFormat.getInstance() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala index f7903dbe908f5..dc29ec2715c7c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala @@ -212,6 +212,6 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U logDebug(s"SpecificUnsafeRowJoiner($schema1, $schema2):\n${CodeFormatter.format(code)}") val c = CodeGenerator.compile(code) - c.generate(Array.empty).asInstanceOf[UnsafeRowJoiner] + c.generate(ctx.references.toArray).asInstanceOf[UnsafeRowJoiner] } } From 9bf40e6565aa28a29a3f1c4089756e43b2d7caae Mon Sep 17 00:00:00 2001 From: Josiah Samuel Date: Fri, 10 Mar 2017 05:06:47 -0500 Subject: [PATCH 4/6] handle lazy serialization --- .../main/scala/org/apache/spark/util/AccumulatorV2.scala | 2 ++ .../org/apache/spark/sql/catalyst/UserTaskMetrics.scala | 9 ++++++++- .../sql/catalyst/expressions/codegen/CodeGenerator.scala | 5 +++-- .../expressions/codegen/GenerateMutableProjection.scala | 8 ++++---- .../catalyst/expressions/codegen/GenerateOrdering.scala | 8 ++++---- .../catalyst/expressions/codegen/GeneratePredicate.scala | 8 ++++---- .../expressions/codegen/GenerateSafeProjection.scala | 8 ++++---- .../expressions/codegen/GenerateUnsafeProjection.scala | 8 ++++---- .../expressions/codegen/GenerateUnsafeRowJoiner.scala | 8 ++++---- 9 files changed, 37 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 00e0cf257cd4a..24d02a9661185 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -152,6 +152,8 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { */ def value: OUT + def flipDriverSide: Unit = atDriverSide = if (atDriverSide) false else true + // Called by Java when serializing an object final protected def writeReplace(): Any = { if (atDriverSide) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala index 5f1603af81d0b..5ef3d81c193e2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala @@ -70,8 +70,11 @@ private[spark] object UserTaskMetrics { if (acc.isAtDriverSide) { val taskContext = TaskContext.get() if (taskContext != null) { + acc.flipDriverSide taskContext.registerAccumulator(acc) } + } else { + acc.flipDriverSide } } @@ -80,11 +83,15 @@ private[spark] object UserTaskMetrics { def createMetric(sc: SparkContext, name: String): UserTaskMetric = { val acc = new UserTaskMetric() acc.register(sc, name = Some(name), countFailedValues = false) - // registerWithTaskContext(acc) acc } def metricTerm(ctx: CodegenContext, name: String, desc: String): String = { + val acc = createMetric(sc, desc) + ctx.addReferenceObj(name, acc ) + } + + def metricTermWithRegister(ctx: CodegenContext, name: String, desc: String): String = { val acc = createMetric(sc, desc) UserTaskMetrics.registerWithTaskContext(acc) ctx.addReferenceObj(name, acc ) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index cbf07c93c5bee..0e147b305c348 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -32,7 +32,7 @@ import scala.language.existentials import org.apache.spark.{DebugMetrics, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.CodegenMetrics -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, UserTaskMetric} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.types._ @@ -922,7 +922,8 @@ object CodeGenerator extends Logging { classOf[UnsafeArrayData].getName, classOf[MapData].getName, classOf[UnsafeMapData].getName, - classOf[Expression].getName + classOf[Expression].getName, + classOf[UserTaskMetric].getName )) evaluator.setExtendedClass(classOf[GeneratedClass]) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala index 3e7728abcc7f2..838c0daf460d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala @@ -53,10 +53,10 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP expressions: Seq[Expression], useSubexprElimination: Boolean): MutableProjection = { val ctx = newCodeGenContext() - UserTaskMetrics.metricTerm(ctx, "userDefined1", "MutableProj User Defined Sum Metrics 1") - UserTaskMetrics.metricTerm(ctx, "userDefined2", "MutableProj User Defined Sum Metrics 2") - UserTaskMetrics.metricTerm(ctx, "userDefined3", "MutableProj User Defined Sum Metrics 3") - UserTaskMetrics.metricTerm(ctx, "userDefined4", "MutableProj User Defined Sum Metrics 4") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined1", "MutableProj User Defined Sum Metrics 1") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined2", "MutableProj User Defined Sum Metrics 2") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined3", "MutableProj User Defined Sum Metrics 3") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined4", "MutableProj User Defined Sum Metrics 4") val (validExpr, index) = expressions.zipWithIndex.filter { case (NoOp, _) => false diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala index e0130ee6ab9e9..31efecf1dcc42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala @@ -124,10 +124,10 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR protected def create(ordering: Seq[SortOrder]): BaseOrdering = { val ctx = newCodeGenContext() - UserTaskMetrics.metricTerm(ctx, "userDefined1", "Ordering User Defined Sum Metrics 1") - UserTaskMetrics.metricTerm(ctx, "userDefined2", "Ordering User Defined Sum Metrics 2") - UserTaskMetrics.metricTerm(ctx, "userDefined3", "Ordering User Defined Sum Metrics 3") - UserTaskMetrics.metricTerm(ctx, "userDefined4", "Ordering User Defined Sum Metrics 4") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined1", "Ordering User Defined Sum Metrics 1") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined2", "Ordering User Defined Sum Metrics 2") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined3", "Ordering User Defined Sum Metrics 3") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined4", "Ordering User Defined Sum Metrics 4") val comparisons = genComparisons(ctx, ordering) val codeBody = s""" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala index 96999c1da068e..4c902d5529a80 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala @@ -47,10 +47,10 @@ object GeneratePredicate extends CodeGenerator[Expression, Predicate] { protected def create(predicate: Expression): Predicate = { val ctx = newCodeGenContext() - UserTaskMetrics.metricTerm(ctx, "userDefined1", "Predicate User Defined Sum Metrics 1") - UserTaskMetrics.metricTerm(ctx, "userDefined2", "Predicate User Defined Sum Metrics 2") - UserTaskMetrics.metricTerm(ctx, "userDefined3", "Predicate User Defined Sum Metrics 3") - UserTaskMetrics.metricTerm(ctx, "userDefined4", "Predicate User Defined Sum Metrics 4") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined1", "Predicate User Defined Sum Metrics 1") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined2", "Predicate User Defined Sum Metrics 2") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined3", "Predicate User Defined Sum Metrics 3") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined4", "Predicate User Defined Sum Metrics 4") val eval = predicate.genCode(ctx) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala index 118bee007c535..616e79eaba52f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala @@ -140,10 +140,10 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] protected def create(expressions: Seq[Expression]): Projection = { val ctx = newCodeGenContext() - UserTaskMetrics.metricTerm(ctx, "userDefined1", "SafeProj User Defined Sum Metrics 1") - UserTaskMetrics.metricTerm(ctx, "userDefined2", "SafeProj User Defined Sum Metrics 2") - UserTaskMetrics.metricTerm(ctx, "userDefined3", "SafeProj User Defined Sum Metrics 3") - UserTaskMetrics.metricTerm(ctx, "userDefined4", "SafeProj User Defined Sum Metrics 4") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined1", "SafeProj User Defined Sum Metrics 1") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined2", "SafeProj User Defined Sum Metrics 2") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined3", "SafeProj User Defined Sum Metrics 3") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined4", "SafeProj User Defined Sum Metrics 4") val expressionCodes = expressions.zipWithIndex.map { case (NoOp, _) => "" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index 04a0274da5a3b..da25210c0c746 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -364,10 +364,10 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro expressions: Seq[Expression], subexpressionEliminationEnabled: Boolean): UnsafeProjection = { val ctx = newCodeGenContext() - UserTaskMetrics.metricTerm(ctx, "userDefined1", "UnsafeProj User Defined Sum Metrics 1") - UserTaskMetrics.metricTerm(ctx, "userDefined2", "UnsafeProj User Defined Sum Metrics 2") - UserTaskMetrics.metricTerm(ctx, "userDefined3", "UnsafeProj User Defined Sum Metrics 3") - UserTaskMetrics.metricTerm(ctx, "userDefined4", "UnsafeProj User Defined Sum Metrics 4") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined1", "UnsafeProj User Defined Sum Metrics 1") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined2", "UnsafeProj User Defined Sum Metrics 2") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined3", "UnsafeProj User Defined Sum Metrics 3") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined4", "UnsafeProj User Defined Sum Metrics 4") val eval = createCode(ctx, expressions, subexpressionEliminationEnabled) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala index dc29ec2715c7c..da7ebf946a271 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala @@ -158,10 +158,10 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U }.mkString("\n") val ctx = newCodeGenContext() - UserTaskMetrics.metricTerm(ctx, "userDefined1", "UnsafeRowJoiner User Defined Sum Metrics 1") - UserTaskMetrics.metricTerm(ctx, "userDefined2", "UnsafeRowJoiner User Defined Sum Metrics 2") - UserTaskMetrics.metricTerm(ctx, "userDefined3", "UnsafeRowJoiner User Defined Sum Metrics 3") - UserTaskMetrics.metricTerm(ctx, "userDefined4", "UnsafeRowJoiner User Defined Sum Metrics 4") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined1", "UnsafeRowJoiner User Defined Sum Metrics 1") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined2", "UnsafeRowJoiner User Defined Sum Metrics 2") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined3", "UnsafeRowJoiner User Defined Sum Metrics 3") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined4", "UnsafeRowJoiner User Defined Sum Metrics 4") // ------------------------ Finally, put everything together --------------------------- // val codeBody = s""" From 5db85b996ffe7d83e7fcb7e3b5bd293ce2d5fc05 Mon Sep 17 00:00:00 2001 From: Josiah Samuel Date: Mon, 13 Mar 2017 09:23:40 -0500 Subject: [PATCH 5/6] fix cluster issue with accumulators --- .../spark/sql/catalyst/UserTaskMetrics.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala index 5ef3d81c193e2..42def07c9dce3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala @@ -22,7 +22,7 @@ import java.text.NumberFormat import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.util.{AccumulatorContext, AccumulatorV2} -import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.{SparkContext, SparkEnv, TaskContext} import scala.collection.mutable.ArrayBuffer class UserTaskMetric(initValue: Long = 0L) @@ -78,23 +78,29 @@ private[spark] object UserTaskMetrics { } } - private val sc = SparkContext.getOrCreate() + // val sc = SparkContext.getOrCreate(SparkEnv.get.conf) - def createMetric(sc: SparkContext, name: String): UserTaskMetric = { + def createMetric(name: String): UserTaskMetric = { + val sc = SparkContext.getOrCreate(SparkEnv.get.conf) val acc = new UserTaskMetric() acc.register(sc, name = Some(name), countFailedValues = false) acc } def metricTerm(ctx: CodegenContext, name: String, desc: String): String = { - val acc = createMetric(sc, desc) + val acc = createMetric(desc) ctx.addReferenceObj(name, acc ) } def metricTermWithRegister(ctx: CodegenContext, name: String, desc: String): String = { - val acc = createMetric(sc, desc) - UserTaskMetrics.registerWithTaskContext(acc) - ctx.addReferenceObj(name, acc ) + val str = if (SparkEnv.get.executorId == "driver") { + val acc = createMetric(desc) + UserTaskMetrics.registerWithTaskContext(acc) + ctx.addReferenceObj(name, acc ) + } else { + "" + } + str } /** From 1099817509bff6d8d2b83cf05b491c2ae55046a3 Mon Sep 17 00:00:00 2001 From: Josiah Samuel Date: Tue, 14 Mar 2017 05:24:15 -0500 Subject: [PATCH 6/6] fix for metrics adding in executor side generated code --- .../spark/sql/catalyst/UserTaskMetrics.scala | 7 ++++ .../catalyst/encoders/ExpressionEncoder.scala | 12 +++++- .../expressions/codegen/CodeGenerator.scala | 9 +++++ .../codegen/GenerateSafeProjection.scala | 20 ++++++++-- .../codegen/GenerateUnsafeProjection.scala | 38 ++++++++++++++++--- .../execution/datasources/FileFormat.scala | 14 ++++++- 6 files changed, 86 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala index 42def07c9dce3..204813fb75883 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala @@ -92,6 +92,13 @@ private[spark] object UserTaskMetrics { ctx.addReferenceObj(name, acc ) } + def addMetrics(ctx: CodegenContext, ref: ArrayBuffer[Any]): Unit = { + ref.zipWithIndex.foreach { + case (obj, index) => + ctx.addReferenceObj("usermetrics_" + index, obj) + } + } + def metricTermWithRegister(ctx: CodegenContext, name: String, desc: String): String = { val str = if (SparkEnv.get.executorId == "driver") { val acc = createMetric(desc) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 9c4818db6333b..d17a5f3d1d89e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -21,7 +21,7 @@ import scala.reflect.ClassTag import scala.reflect.runtime.universe.{typeTag, TypeTag} import org.apache.spark.sql.Encoder -import org.apache.spark.sql.catalyst.{InternalRow, JavaTypeInference, ScalaReflection} +import org.apache.spark.sql.catalyst.{InternalRow, JavaTypeInference, ScalaReflection, UserTaskMetrics} import org.apache.spark.sql.catalyst.analysis.{Analyzer, GetColumnByOrdinal, SimpleAnalyzer, UnresolvedAttribute, UnresolvedExtractValue} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, GenerateUnsafeProjection} @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts import org.apache.spark.sql.catalyst.plans.logical.{CatalystSerde, DeserializeToObject, LocalRelation} import org.apache.spark.sql.types.{BooleanType, ObjectType, StructField, StructType} import org.apache.spark.util.Utils +import scala.collection.mutable /** * A factory for constructing encoders that convert objects and primitives to and from the @@ -268,8 +269,15 @@ case class ExpressionEncoder[T]( @transient private lazy val inputRow = new GenericInternalRow(1) + val references: mutable.ArrayBuffer[Any] = new mutable.ArrayBuffer[Any]() + + references += UserTaskMetrics.createMetric("Safe User Defined Sum Metrics 1") + references += UserTaskMetrics.createMetric("Safe User Defined Sum Metrics 2") + references += UserTaskMetrics.createMetric("Safe User Defined Sum Metrics 3") + references += UserTaskMetrics.createMetric("Safe User Defined Sum Metrics 4") + @transient - private lazy val constructProjection = GenerateSafeProjection.generate(deserializer :: Nil) + private lazy val constructProjection = GenerateSafeProjection.generate(deserializer :: Nil, references) /** * Returns a new set (with unique ids) of [[NamedExpression]] that represent the serialized form diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 0e147b305c348..d2b921a2d9209 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -872,6 +872,15 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin /** Generates the requested evaluator given already bound expression(s). */ def generate(expressions: InType): OutType = create(canonicalize(expressions)) + /** Generates the requested evaluator binding the given expression(s) to the inputSchema. */ + def generate(expressions: InType, inputSchema: Seq[Attribute], + ref: ArrayBuffer[Any]): OutType = + generate(bind(expressions, inputSchema), ref) + + /** Generates the requested evaluator given already bound expression(s). */ + def generate(expressions: InType, ref: ArrayBuffer[Any]): OutType = + create(canonicalize(expressions)) + /** * Create a new codegen context for expression evaluator, used to store those * expressions that don't support codegen diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala index 616e79eaba52f..83fb5c878bff3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen import scala.annotation.tailrec import org.apache.spark.sql.catalyst.UserTaskMetrics +import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} @@ -138,13 +139,24 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] case _ => ExprCode("", "false", input) } + override def generate( + expressions: Seq[Expression], + references: ArrayBuffer[Any]): Projection = { + create(expressions, references) + } + + protected def create(expressions: Seq[Expression], ref: ArrayBuffer[Any]): Projection = { + val ctx = newCodeGenContext() + UserTaskMetrics.addMetrics(ctx, ref) + create(expressions, ctx) + } + protected def create(expressions: Seq[Expression]): Projection = { val ctx = newCodeGenContext() - UserTaskMetrics.metricTermWithRegister(ctx, "userDefined1", "SafeProj User Defined Sum Metrics 1") - UserTaskMetrics.metricTermWithRegister(ctx, "userDefined2", "SafeProj User Defined Sum Metrics 2") - UserTaskMetrics.metricTermWithRegister(ctx, "userDefined3", "SafeProj User Defined Sum Metrics 3") - UserTaskMetrics.metricTermWithRegister(ctx, "userDefined4", "SafeProj User Defined Sum Metrics 4") + create(expressions, ctx) + } + protected def create(expressions: Seq[Expression], ctx: CodegenContext): Projection = { val expressionCodes = expressions.zipWithIndex.map { case (NoOp, _) => "" case (e, i) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index da25210c0c746..275dd83745bd9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.catalyst.UserTaskMetrics +import scala.collection.mutable.ArrayBuffer /** * Generates a [[Projection]] that returns an [[UnsafeRow]]. @@ -356,19 +357,44 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro create(canonicalize(expressions), subexpressionEliminationEnabled) } + override def generate(expressions: Seq[Expression], + ref: ArrayBuffer[Any]): UnsafeProjection = { + + create(canonicalize(expressions), ref) + } + protected def create(references: Seq[Expression]): UnsafeProjection = { create(references, subexpressionEliminationEnabled = false) } + protected def create(references: Seq[Expression], + ctxReferences: ArrayBuffer[Any]): UnsafeProjection = { + create(references, subexpressionEliminationEnabled = false, ctxReferences) + } + private def create( - expressions: Seq[Expression], - subexpressionEliminationEnabled: Boolean): UnsafeProjection = { + expressions: Seq[Expression], + subexpressionEliminationEnabled: Boolean, + references: ArrayBuffer[Any]): UnsafeProjection = { + val ctx = newCodeGenContext() + + UserTaskMetrics.addMetrics(ctx, references) + + create(expressions, subexpressionEliminationEnabled, ctx) + } + + private def create( + expressions: Seq[Expression], + subexpressionEliminationEnabled: Boolean): UnsafeProjection = { val ctx = newCodeGenContext() - UserTaskMetrics.metricTermWithRegister(ctx, "userDefined1", "UnsafeProj User Defined Sum Metrics 1") - UserTaskMetrics.metricTermWithRegister(ctx, "userDefined2", "UnsafeProj User Defined Sum Metrics 2") - UserTaskMetrics.metricTermWithRegister(ctx, "userDefined3", "UnsafeProj User Defined Sum Metrics 3") - UserTaskMetrics.metricTermWithRegister(ctx, "userDefined4", "UnsafeProj User Defined Sum Metrics 4") + create(expressions, subexpressionEliminationEnabled, ctx) + + } + private def create( + expressions: Seq[Expression], + subexpressionEliminationEnabled: Boolean, + ctx: CodegenContext): UnsafeProjection = { val eval = createCode(ctx, expressions, subexpressionEliminationEnabled) val codeBody = s""" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 4f4aaaa5026fb..05cca77f1c330 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -23,11 +23,12 @@ import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompres import org.apache.hadoop.mapreduce.Job import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, UserTaskMetrics} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType +import scala.collection.mutable /** @@ -119,14 +120,23 @@ trait FileFormat { val dataReader = buildReader( sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf) + val references: mutable.ArrayBuffer[Any] = new mutable.ArrayBuffer[Any]() + + references += UserTaskMetrics.createMetric("UnSafe User Defined Sum Metrics 1") + references += UserTaskMetrics.createMetric("UnSafe User Defined Sum Metrics 2") + references += UserTaskMetrics.createMetric("UnSafe User Defined Sum Metrics 3") + references += UserTaskMetrics.createMetric("UnSafe User Defined Sum Metrics 4") + new (PartitionedFile => Iterator[InternalRow]) with Serializable { private val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes private val joinedRow = new JoinedRow() + // Using lazy val to avoid serialization private lazy val appendPartitionColumns = - GenerateUnsafeProjection.generate(fullSchema, fullSchema) + GenerateUnsafeProjection.generate(fullSchema, fullSchema, + references) override def apply(file: PartitionedFile): Iterator[InternalRow] = { // Using local val to avoid per-row lazy val check (pre-mature optimization?...)