Skip to content

Commit

Permalink
chore: Improve CometExchange metrics (apache#873)
Browse files Browse the repository at this point in the history
* init

* chore: Improve CometExchange metrics

* fix format

* Add data size
  • Loading branch information
viirya authored Aug 28, 2024
1 parent 3d1dbb0 commit f7f0bb1
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 36 deletions.
59 changes: 51 additions & 8 deletions native/core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use arrow::{datatypes::*, ipc::writer::StreamWriter};
use async_trait::async_trait;
use bytes::Buf;
use crc32fast::Hasher;
use datafusion::physical_plan::metrics::Time;
use datafusion::{
arrow::{
array::*,
Expand Down Expand Up @@ -239,14 +240,19 @@ impl PartitionBuffer {
}

/// Appends all rows of given batch into active array builders.
fn append_batch(&mut self, batch: &RecordBatch) -> Result<isize> {
fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time) -> Result<isize> {
let columns = batch.columns();
let indices = (0..batch.num_rows()).collect::<Vec<usize>>();
self.append_rows(columns, &indices)
self.append_rows(columns, &indices, time_metric)
}

/// Appends rows of specified indices from columns into active array builders.
fn append_rows(&mut self, columns: &[ArrayRef], indices: &[usize]) -> Result<isize> {
fn append_rows(
&mut self,
columns: &[ArrayRef],
indices: &[usize],
time_metric: &Time,
) -> Result<isize> {
let mut mem_diff = 0;
let mut start = 0;

Expand All @@ -263,7 +269,10 @@ impl PartitionBuffer {
});
self.num_active_rows += end - start;
if self.num_active_rows >= self.batch_size {
let mut timer = time_metric.timer();
mem_diff += self.flush()?;
timer.stop();

mem_diff += self.init_active_if_necessary()?;
}
start = end;
Expand Down Expand Up @@ -592,6 +601,9 @@ struct ShuffleRepartitionerMetrics {

/// total spilled bytes during the execution of the operator
spilled_bytes: Count,

/// The original size of spilled data. Different to `spilled_bytes` because of compression.
data_size: Count,
}

impl ShuffleRepartitionerMetrics {
Expand All @@ -600,6 +612,7 @@ impl ShuffleRepartitionerMetrics {
baseline: BaselineMetrics::new(metrics, partition),
spill_count: MetricBuilder::new(metrics).spill_count(partition),
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
data_size: MetricBuilder::new(metrics).counter("data_size", partition),
}
}
}
Expand Down Expand Up @@ -683,7 +696,10 @@ impl ShuffleRepartitioner {
));
}

let _timer = self.metrics.baseline.elapsed_compute().timer();
// Update data size metric
self.metrics.data_size.add(input.get_array_memory_size());

let time_metric = self.metrics.baseline.elapsed_compute();

// NOTE: in shuffle writer exec, the output_rows metrics represents the
// number of rows those are written to output data file.
Expand Down Expand Up @@ -754,8 +770,11 @@ impl ShuffleRepartitioner {

// If the range of indices is not big enough, just appending the rows into
// active array builders instead of directly adding them as a record batch.
mem_diff +=
output.append_rows(input.columns(), &shuffled_partition_ids[start..end])?;
mem_diff += output.append_rows(
input.columns(),
&shuffled_partition_ids[start..end],
time_metric,
)?;
}

if mem_diff > 0 {
Expand All @@ -782,7 +801,7 @@ impl ShuffleRepartitioner {
);

let output = &mut buffered_partitions[0];
output.append_batch(&input)?;
output.append_batch(&input, time_metric)?;
}
other => {
// this should be unreachable as long as the validation logic
Expand All @@ -798,7 +817,6 @@ impl ShuffleRepartitioner {

/// Writes buffered shuffled record batches into Arrow IPC bytes.
async fn shuffle_write(&mut self) -> Result<SendableRecordBatchStream> {
let _timer = self.metrics.baseline.elapsed_compute().timer();
let num_output_partitions = self.num_output_partitions;
let mut buffered_partitions = self.buffered_partitions.lock().await;
let mut output_batches: Vec<Vec<u8>> = vec![vec![]; num_output_partitions];
Expand All @@ -823,14 +841,21 @@ impl ShuffleRepartitioner {
.map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?;

for i in 0..num_output_partitions {
let mut timer = self.metrics.baseline.elapsed_compute().timer();

offsets[i] = output_data.stream_position()?;
output_data.write_all(&output_batches[i])?;

timer.stop();

output_batches[i].clear();

// append partition in each spills
for spill in &output_spills {
let length = spill.offsets[i + 1] - spill.offsets[i];
if length > 0 {
let mut timer = self.metrics.baseline.elapsed_compute().timer();

let mut spill_file =
BufReader::new(File::open(spill.file.path()).map_err(|e| {
DataFusionError::Execution(format!("shuffle write error: {:?}", e))
Expand All @@ -839,15 +864,22 @@ impl ShuffleRepartitioner {
std::io::copy(&mut spill_file.take(length), &mut output_data).map_err(|e| {
DataFusionError::Execution(format!("shuffle write error: {:?}", e))
})?;

timer.stop();
}
}
}
let mut timer = self.metrics.baseline.elapsed_compute().timer();
output_data.flush()?;
timer.stop();

// add one extra offset at last to ease partition length computation
offsets[num_output_partitions] = output_data
.stream_position()
.map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?;

let mut timer = self.metrics.baseline.elapsed_compute().timer();

let mut output_index =
BufWriter::new(File::create(index_file).map_err(|e| {
DataFusionError::Execution(format!("shuffle write error: {:?}", e))
Expand All @@ -859,6 +891,8 @@ impl ShuffleRepartitioner {
}
output_index.flush()?;

timer.stop();

let used = self.reservation.size();
self.reservation.shrink(used);

Expand All @@ -878,6 +912,10 @@ impl ShuffleRepartitioner {
self.metrics.spill_count.value()
}

fn data_size(&self) -> usize {
self.metrics.data_size.value()
}

async fn spill(&self) -> Result<usize> {
log::debug!(
"ShuffleRepartitioner spilling shuffle data of {} to disk while inserting ({} time(s) so far)",
Expand All @@ -891,6 +929,8 @@ impl ShuffleRepartitioner {
return Ok(0);
}

let mut timer = self.metrics.baseline.elapsed_compute().timer();

let spillfile = self
.runtime
.disk_manager
Expand All @@ -902,6 +942,8 @@ impl ShuffleRepartitioner {
)
.await?;

timer.stop();

let mut spills = self.spills.lock().await;
let used = self.reservation.size();
self.metrics.spill_count.add(1);
Expand Down Expand Up @@ -955,6 +997,7 @@ impl Debug for ShuffleRepartitioner {
.field("memory_used", &self.used())
.field("spilled_bytes", &self.spilled_bytes())
.field("spilled_count", &self.spill_count())
.field("data_size", &self.data_size())
.finish()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition}
import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD, CometShuffleExchangeExec}
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.{METRIC_NATIVE_TIME_DESCRIPTION, METRIC_NATIVE_TIME_NAME}
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, UnaryExecNode, UnsafeRowSerializer}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -56,8 +55,6 @@ case class CometCollectLimitExec(
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
METRIC_NATIVE_TIME_NAME ->
SQLMetrics.createNanoTimingMetric(sparkContext, METRIC_NATIVE_TIME_DESCRIPTION),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
"number of partitions")) ++ readMetrics ++ writeMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD, CometShuffleExchangeExec}
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.{METRIC_NATIVE_TIME_DESCRIPTION, METRIC_NATIVE_TIME_NAME}
import org.apache.spark.sql.execution.{SparkPlan, TakeOrderedAndProjectExec, UnaryExecNode, UnsafeRowSerializer}
import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.serde.QueryPlanSerde.exprToProto
Expand Down Expand Up @@ -57,8 +55,6 @@ case class CometTakeOrderedAndProjectExec(
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
METRIC_NATIVE_TIME_NAME ->
SQLMetrics.createNanoTimingMetric(sparkContext, METRIC_NATIVE_TIME_DESCRIPTION),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
"number of partitions")) ++ readMetrics ++ writeMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ package org.apache.spark.sql.comet

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.{METRIC_NATIVE_TIME_DESCRIPTION, METRIC_NATIVE_TIME_NAME}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}

import com.google.common.base.Objects

Expand All @@ -49,17 +48,9 @@ case class CometWindowExec(

override def nodeName: String = "CometWindowExec"

private lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
private lazy val readMetrics =
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
METRIC_NATIVE_TIME_NAME ->
SQLMetrics.createNanoTimingMetric(sparkContext, METRIC_NATIVE_TIME_DESCRIPTION),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
"number of partitions")) ++ readMetrics ++ writeMetrics
"numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions"))

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.comet.{CometExec, CometMetricNode, CometPlan}
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.{METRIC_NATIVE_TIME_DESCRIPTION, METRIC_NATIVE_TIME_NAME}
import org.apache.spark.sql.comet.shims.ShimCometShuffleWriteProcessor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike, ShuffleOrigin}
Expand Down Expand Up @@ -78,8 +77,6 @@ case class CometShuffleExchangeExec(
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
METRIC_NATIVE_TIME_NAME ->
SQLMetrics.createNanoTimingMetric(sparkContext, METRIC_NATIVE_TIME_DESCRIPTION),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
"number of partitions")) ++ readMetrics ++ writeMetrics
Expand Down Expand Up @@ -221,9 +218,6 @@ case class CometShuffleExchangeExec(
}

object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec {
val METRIC_NATIVE_TIME_NAME = "shuffleNativeTotalTime"
val METRIC_NATIVE_TIME_DESCRIPTION = "shuffle native code time"

def prepareShuffleDependency(
rdd: RDD[ColumnarBatch],
outputAttributes: Seq[Attribute],
Expand Down Expand Up @@ -468,6 +462,7 @@ class CometShuffleWriteProcessor(
mapId: Long,
mapIndex: Int,
context: TaskContext): MapStatus = {
val metricsReporter = createMetricsReporter(context)
val shuffleBlockResolver =
SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver]
val dataFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
Expand All @@ -483,11 +478,13 @@ class CometShuffleWriteProcessor(
// Maps native metrics to SQL metrics
val nativeSQLMetrics = Map(
"output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN),
"elapsed_compute" -> metrics("shuffleNativeTotalTime"))
"data_size" -> metrics("dataSize"),
"elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME))
val nativeMetrics = CometMetricNode(nativeSQLMetrics)

// Getting rid of the fake partitionId
val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]].map(_._2)

val cometIter = CometExec.getCometIterator(
Seq(newInputs.asInstanceOf[Iterator[ColumnarBatch]]),
nativePlan,
Expand All @@ -512,8 +509,8 @@ class CometShuffleWriteProcessor(
})
.toArray

// Update Spark metrics from native metrics
metrics("dataSize") += Files.size(tempDataFilePath)
// Total written bytes at native
metricsReporter.incBytesWritten(Files.size(tempDataFilePath))

// commit
shuffleBlockResolver.writeMetadataFileAndCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,15 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper

assert(metrics.contains("shuffleRecordsWritten"))
assert(metrics("shuffleRecordsWritten").value == 5L)

assert(metrics.contains("shuffleBytesWritten"))
assert(metrics("shuffleBytesWritten").value > 0)

assert(metrics.contains("dataSize"))
assert(metrics("dataSize").value > 0L)

assert(metrics.contains("shuffleWriteTime"))
assert(metrics("shuffleWriteTime").value > 0L)
}
}

Expand Down

0 comments on commit f7f0bb1

Please sign in to comment.