diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 00e0cf257cd4a..24d02a9661185 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -152,6 +152,8 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { */ def value: OUT + def flipDriverSide: Unit = atDriverSide = if (atDriverSide) false else true + // Called by Java when serializing an object final protected def writeReplace(): Any = { if (atDriverSide) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserStatsReportListener.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserStatsReportListener.scala new file mode 100644 index 0000000000000..ca7b7750fce28 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserStatsReportListener.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst + +import scala.collection.mutable +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ + +class UserStatsReportListener extends SparkListener with Logging { + + private val taskInfoMetrics = mutable.Buffer[(TaskInfo, TaskMetrics)]() + + private val taskInfoUserMetrics = mutable.Buffer[(TaskInfo, Seq[AccumulableInfo])]() + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + val info = taskEnd.taskInfo + val metrics = taskEnd.taskMetrics + if (info != null && metrics != null) { + taskInfoMetrics += ((info, metrics)) + } + } + + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { + implicit val sc = stageCompleted + this.logInfo(s"Finished stage: ${getStatusDetail(stageCompleted.stageInfo)}") + + val accumulatorUpdates = mutable.Buffer[(Long, Any)]() + + taskInfoMetrics.map{ case(_, metrics) => + metrics.externalAccums.map(a => + a.toInfo(Some(a.value), None)).filter(_.update.isDefined) + .map(accum => { + accumulatorUpdates += + ((accum.id, accum.name.getOrElse("") + ":" + accum.update.get.toString)) + }) + } + + accumulatorUpdates.groupBy(_._1).map { case (accumulatorId, values) => + accumulatorId -> + UserTaskMetrics.stringValue(values.map(_._2)) + }.foreach(x => this.logError(x._1 + " : " + x._2)) + + taskInfoMetrics.clear() + } + + private def getStatusDetail(info: StageInfo): String = { + val failureReason = info.failureReason.map("(" + _ + ")").getOrElse("") + val timeTaken = info.submissionTime.map( + x => info.completionTime.getOrElse(System.currentTimeMillis()) - x + ).getOrElse("-") + + s"Stage(${info.stageId}, ${info.attemptId}); Name: '${info.name}'; " + + s"Status: ${info.getStatusString}$failureReason; numTasks: ${info.numTasks}; " + + s"Took: $timeTaken msec" + } + +} + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala new file mode 100644 index 0000000000000..204813fb75883 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/UserTaskMetrics.scala @@ -0,0 +1,146 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.catalyst + +import java.text.NumberFormat + +import org.apache.spark.scheduler.AccumulableInfo +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.util.{AccumulatorContext, AccumulatorV2} +import org.apache.spark.{SparkContext, SparkEnv, TaskContext} +import scala.collection.mutable.ArrayBuffer + +class UserTaskMetric(initValue: Long = 0L) + extends AccumulatorV2[Long, Long] { + private[this] var _value = initValue + private var _zeroValue = initValue + + override def copy(): UserTaskMetric = { + val newAcc = new UserTaskMetric(_value) + newAcc._zeroValue = initValue + newAcc + } + + override def reset(): Unit = _value = _zeroValue + + override def merge(other: AccumulatorV2[Long, Long]): Unit = other match { + case o: UserTaskMetric => _value += o.value + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + override def isZero(): Boolean = _value == _zeroValue + + override def add(v: Long): Unit = _value += v + + def +=(v: Long): Unit = _value += v + + override def value: Long = _value + + def showMetricName: String = metadata.name match { + case Some(name) => name + case None => "" + } + + // Provide special identifier as metadata so we can tell that this is a `UserTaskMetric` later + private[spark] override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { + new AccumulableInfo( + id, name, update, value, true, true, Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) + } +} + + +private[spark] object UserTaskMetrics { + def registerWithTaskContext(acc: UserTaskMetric): Unit = { + if (acc.isAtDriverSide) { + val taskContext = TaskContext.get() + if (taskContext != null) { + acc.flipDriverSide + taskContext.registerAccumulator(acc) + } + } else { + acc.flipDriverSide + } + } + + // val sc = SparkContext.getOrCreate(SparkEnv.get.conf) + + def createMetric(name: String): UserTaskMetric = { + val sc = SparkContext.getOrCreate(SparkEnv.get.conf) + val acc = new UserTaskMetric() + acc.register(sc, name = Some(name), countFailedValues = false) + acc + } + + def metricTerm(ctx: CodegenContext, name: String, desc: String): String = { + val acc = createMetric(desc) + ctx.addReferenceObj(name, acc ) + } + + def addMetrics(ctx: CodegenContext, ref: ArrayBuffer[Any]): Unit = { + ref.zipWithIndex.foreach { + case (obj, index) => + ctx.addReferenceObj("usermetrics_" + index, obj) + } + } + + def metricTermWithRegister(ctx: CodegenContext, name: String, desc: String): String = { + val str = if (SparkEnv.get.executorId == "driver") { + val acc = createMetric(desc) + UserTaskMetrics.registerWithTaskContext(acc) + ctx.addReferenceObj(name, acc ) + } else { + "" + } + str + } + + /** + * A function that defines how we aggregate the final accumulator results among all tasks, + * and represent it in string for a SQL physical operator. + */ + def stringValue(valuesInput: Seq[Any]): String = { + var valStr = "" + + val values = valuesInput.flatMap(valuesTmp => { + valuesInput.asInstanceOf[ArrayBuffer[String]].map(vtmp => { + if (vtmp.contains(":")) { + valStr = vtmp.split(":")(0) + vtmp.split(":")(1).toLong + } else { + vtmp.toLong + } + }) + }) + + val numberFormat = NumberFormat.getInstance() + + val validValues = values.filter(_ >= 0) + val Seq(sum, min, med, max) = { + val metric = if (validValues.isEmpty) { + Seq.fill(4)(0L) + } else { + val sorted = validValues.sorted + Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1)) + } + metric.map(numberFormat.format) + } + s"$valStr: $sum ($min, $med, $max)" + } +} + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 9c4818db6333b..d17a5f3d1d89e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -21,7 +21,7 @@ import scala.reflect.ClassTag import scala.reflect.runtime.universe.{typeTag, TypeTag} import org.apache.spark.sql.Encoder -import org.apache.spark.sql.catalyst.{InternalRow, JavaTypeInference, ScalaReflection} +import org.apache.spark.sql.catalyst.{InternalRow, JavaTypeInference, ScalaReflection, UserTaskMetrics} import org.apache.spark.sql.catalyst.analysis.{Analyzer, GetColumnByOrdinal, SimpleAnalyzer, UnresolvedAttribute, UnresolvedExtractValue} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, GenerateUnsafeProjection} @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts import org.apache.spark.sql.catalyst.plans.logical.{CatalystSerde, DeserializeToObject, LocalRelation} import org.apache.spark.sql.types.{BooleanType, ObjectType, StructField, StructType} import org.apache.spark.util.Utils +import scala.collection.mutable /** * A factory for constructing encoders that convert objects and primitives to and from the @@ -268,8 +269,15 @@ case class ExpressionEncoder[T]( @transient private lazy val inputRow = new GenericInternalRow(1) + val references: mutable.ArrayBuffer[Any] = new mutable.ArrayBuffer[Any]() + + references += UserTaskMetrics.createMetric("Safe User Defined Sum Metrics 1") + references += UserTaskMetrics.createMetric("Safe User Defined Sum Metrics 2") + references += UserTaskMetrics.createMetric("Safe User Defined Sum Metrics 3") + references += UserTaskMetrics.createMetric("Safe User Defined Sum Metrics 4") + @transient - private lazy val constructProjection = GenerateSafeProjection.generate(deserializer :: Nil) + private lazy val constructProjection = GenerateSafeProjection.generate(deserializer :: Nil, references) /** * Returns a new set (with unique ids) of [[NamedExpression]] that represent the serialized form diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index cbf07c93c5bee..d2b921a2d9209 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -32,7 +32,7 @@ import scala.language.existentials import org.apache.spark.{DebugMetrics, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.CodegenMetrics -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, UserTaskMetric} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.types._ @@ -872,6 +872,15 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin /** Generates the requested evaluator given already bound expression(s). */ def generate(expressions: InType): OutType = create(canonicalize(expressions)) + /** Generates the requested evaluator binding the given expression(s) to the inputSchema. */ + def generate(expressions: InType, inputSchema: Seq[Attribute], + ref: ArrayBuffer[Any]): OutType = + generate(bind(expressions, inputSchema), ref) + + /** Generates the requested evaluator given already bound expression(s). */ + def generate(expressions: InType, ref: ArrayBuffer[Any]): OutType = + create(canonicalize(expressions)) + /** * Create a new codegen context for expression evaluator, used to store those * expressions that don't support codegen @@ -922,7 +931,8 @@ object CodeGenerator extends Logging { classOf[UnsafeArrayData].getName, classOf[MapData].getName, classOf[UnsafeMapData].getName, - classOf[Expression].getName + classOf[Expression].getName, + classOf[UserTaskMetric].getName )) evaluator.setExtendedClass(classOf[GeneratedClass]) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala index 4d732445544a8..838c0daf460d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen +import org.apache.spark.sql.catalyst.UserTaskMetrics import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp @@ -52,6 +53,11 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP expressions: Seq[Expression], useSubexprElimination: Boolean): MutableProjection = { val ctx = newCodeGenContext() + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined1", "MutableProj User Defined Sum Metrics 1") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined2", "MutableProj User Defined Sum Metrics 2") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined3", "MutableProj User Defined Sum Metrics 3") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined4", "MutableProj User Defined Sum Metrics 4") + val (validExpr, index) = expressions.zipWithIndex.filter { case (NoOp, _) => false case _ => true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala index 1cef95654a17b..31efecf1dcc42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala @@ -22,6 +22,7 @@ import java.io.ObjectInputStream import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.esotericsoftware.kryo.io.{Input, Output} +import org.apache.spark.sql.catalyst.UserTaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -123,6 +124,11 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR protected def create(ordering: Seq[SortOrder]): BaseOrdering = { val ctx = newCodeGenContext() + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined1", "Ordering User Defined Sum Metrics 1") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined2", "Ordering User Defined Sum Metrics 2") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined3", "Ordering User Defined Sum Metrics 3") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined4", "Ordering User Defined Sum Metrics 4") + val comparisons = genComparisons(ctx, ordering) val codeBody = s""" public SpecificOrdering generate(Object[] references) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala index dcd1ed96a298e..4c902d5529a80 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen +import org.apache.spark.sql.catalyst.UserTaskMetrics import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -46,6 +47,11 @@ object GeneratePredicate extends CodeGenerator[Expression, Predicate] { protected def create(predicate: Expression): Predicate = { val ctx = newCodeGenContext() + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined1", "Predicate User Defined Sum Metrics 1") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined2", "Predicate User Defined Sum Metrics 2") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined3", "Predicate User Defined Sum Metrics 3") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined4", "Predicate User Defined Sum Metrics 4") + val eval = predicate.genCode(ctx) val codeBody = s""" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala index b1cb6edefb852..83fb5c878bff3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions.codegen import scala.annotation.tailrec +import org.apache.spark.sql.catalyst.UserTaskMetrics +import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} @@ -137,8 +139,24 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] case _ => ExprCode("", "false", input) } + override def generate( + expressions: Seq[Expression], + references: ArrayBuffer[Any]): Projection = { + create(expressions, references) + } + + protected def create(expressions: Seq[Expression], ref: ArrayBuffer[Any]): Projection = { + val ctx = newCodeGenContext() + UserTaskMetrics.addMetrics(ctx, ref) + create(expressions, ctx) + } + protected def create(expressions: Seq[Expression]): Projection = { val ctx = newCodeGenContext() + create(expressions, ctx) + } + + protected def create(expressions: Seq[Expression], ctx: CodegenContext): Projection = { val expressionCodes = expressions.zipWithIndex.map { case (NoOp, _) => "" case (e, i) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index 7e4c9089a2cb9..275dd83745bd9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions.codegen import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types._ +import org.apache.spark.sql.catalyst.UserTaskMetrics +import scala.collection.mutable.ArrayBuffer /** * Generates a [[Projection]] that returns an [[UnsafeRow]]. @@ -355,14 +357,44 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro create(canonicalize(expressions), subexpressionEliminationEnabled) } + override def generate(expressions: Seq[Expression], + ref: ArrayBuffer[Any]): UnsafeProjection = { + + create(canonicalize(expressions), ref) + } + protected def create(references: Seq[Expression]): UnsafeProjection = { create(references, subexpressionEliminationEnabled = false) } + protected def create(references: Seq[Expression], + ctxReferences: ArrayBuffer[Any]): UnsafeProjection = { + create(references, subexpressionEliminationEnabled = false, ctxReferences) + } + private def create( - expressions: Seq[Expression], - subexpressionEliminationEnabled: Boolean): UnsafeProjection = { + expressions: Seq[Expression], + subexpressionEliminationEnabled: Boolean, + references: ArrayBuffer[Any]): UnsafeProjection = { val ctx = newCodeGenContext() + + UserTaskMetrics.addMetrics(ctx, references) + + create(expressions, subexpressionEliminationEnabled, ctx) + } + + private def create( + expressions: Seq[Expression], + subexpressionEliminationEnabled: Boolean): UnsafeProjection = { + val ctx = newCodeGenContext() + create(expressions, subexpressionEliminationEnabled, ctx) + + } + + private def create( + expressions: Seq[Expression], + subexpressionEliminationEnabled: Boolean, + ctx: CodegenContext): UnsafeProjection = { val eval = createCode(ctx, expressions, subexpressionEliminationEnabled) val codeBody = s""" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala index 4aa5ec82471ec..da7ebf946a271 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen +import org.apache.spark.sql.catalyst.UserTaskMetrics import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.Platform @@ -156,16 +157,30 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U } }.mkString("\n") + val ctx = newCodeGenContext() + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined1", "UnsafeRowJoiner User Defined Sum Metrics 1") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined2", "UnsafeRowJoiner User Defined Sum Metrics 2") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined3", "UnsafeRowJoiner User Defined Sum Metrics 3") + UserTaskMetrics.metricTermWithRegister(ctx, "userDefined4", "UnsafeRowJoiner User Defined Sum Metrics 4") + // ------------------------ Finally, put everything together --------------------------- // val codeBody = s""" |public java.lang.Object generate(Object[] references) { - | return new SpecificUnsafeRowJoiner(); + | return new SpecificUnsafeRowJoiner(references); |} | |class SpecificUnsafeRowJoiner extends ${classOf[UnsafeRowJoiner].getName} { + | private Object[] references; + | ${ctx.declareMutableStates()} + | | private byte[] buf = new byte[64]; | private UnsafeRow out = new UnsafeRow(${schema1.size + schema2.size}); | + | public SpecificUnsafeRowJoiner(Object[] references) { + | this.references = references; + | ${ctx.initMutableStates()} + | } + | | public UnsafeRow join(UnsafeRow row1, UnsafeRow row2) { | // row1: ${schema1.size} fields, $bitset1Words words in bitset | // row2: ${schema2.size}, $bitset2Words words in bitset @@ -197,6 +212,6 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U logDebug(s"SpecificUnsafeRowJoiner($schema1, $schema2):\n${CodeFormatter.format(code)}") val c = CodeGenerator.compile(code) - c.generate(Array.empty).asInstanceOf[UnsafeRowJoiner] + c.generate(ctx.references.toArray).asInstanceOf[UnsafeRowJoiner] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 516b9d5444d31..277fcfe06ebe7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.{broadcast, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, UserTaskMetrics} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning @@ -310,6 +310,10 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co */ def doCodeGen(): (CodegenContext, CodeAndComment) = { val ctx = new CodegenContext + UserTaskMetrics.metricTerm(ctx, "userDefined1", "WholeStage User Defined Sum Metrics 1") + UserTaskMetrics.metricTerm(ctx, "userDefined2", "WholeStage User Defined Sum Metrics 2") + UserTaskMetrics.metricTerm(ctx, "userDefined3", "WholeStage User Defined Sum Metrics 3") + UserTaskMetrics.metricTerm(ctx, "userDefined4", "WholeStage User Defined Sum Metrics 4") val code = child.asInstanceOf[CodegenSupport].produce(ctx, this) val source = s""" public Object generate(Object[] references) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index 14024d6c10558..6c36cf79c6566 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.columnar +import org.apache.spark.sql.catalyst.UserTaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -69,6 +70,11 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera protected def create(columnTypes: Seq[DataType]): ColumnarIterator = { val ctx = newCodeGenContext() + UserTaskMetrics.metricTerm(ctx, "userDefined1", "ColAccessor User Defined Sum Metrics 1") + UserTaskMetrics.metricTerm(ctx, "userDefined2", "ColAccessor User Defined Sum Metrics 2") + UserTaskMetrics.metricTerm(ctx, "userDefined3", "ColAccessor User Defined Sum Metrics 3") + UserTaskMetrics.metricTerm(ctx, "userDefined4", "ColAccessor User Defined Sum Metrics 4") + val numFields = columnTypes.size val (initializeAccessors, extractors) = columnTypes.zipWithIndex.map { case (dt, index) => val accessorName = ctx.freshName("accessor") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 4f4aaaa5026fb..05cca77f1c330 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -23,11 +23,12 @@ import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompres import org.apache.hadoop.mapreduce.Job import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, UserTaskMetrics} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType +import scala.collection.mutable /** @@ -119,14 +120,23 @@ trait FileFormat { val dataReader = buildReader( sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf) + val references: mutable.ArrayBuffer[Any] = new mutable.ArrayBuffer[Any]() + + references += UserTaskMetrics.createMetric("UnSafe User Defined Sum Metrics 1") + references += UserTaskMetrics.createMetric("UnSafe User Defined Sum Metrics 2") + references += UserTaskMetrics.createMetric("UnSafe User Defined Sum Metrics 3") + references += UserTaskMetrics.createMetric("UnSafe User Defined Sum Metrics 4") + new (PartitionedFile => Iterator[InternalRow]) with Serializable { private val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes private val joinedRow = new JoinedRow() + // Using lazy val to avoid serialization private lazy val appendPartitionColumns = - GenerateUnsafeProjection.generate(fullSchema, fullSchema) + GenerateUnsafeProjection.generate(fullSchema, fullSchema, + references) override def apply(file: PartitionedFile): Iterator[InternalRow] = { // Using local val to avoid per-row lazy val check (pre-mature optimization?...)